6
6
use Generator ;
7
7
use Iterator ;
8
8
use MongoDB \BSON \Document ;
9
- use MongoDB \BSON \TimestampInterface ;
10
9
use MongoDB \ChangeStream ;
11
10
use MongoDB \Codec \DecodeIfSupported ;
12
11
use MongoDB \Codec \DocumentCodec ;
@@ -106,44 +105,6 @@ public function encode($value): Document
106
105
];
107
106
}
108
107
109
- /**
110
- * Prose test 1: "ChangeStream must continuously track the last seen
111
- * resumeToken"
112
- */
113
- #[DataProvider('provideCodecOptions ' )]
114
- public function testGetResumeToken (array $ options , Closure $ getIdentifier ): void
115
- {
116
- $ this ->skipIfServerVersion ('>= ' , '4.0.7 ' , 'postBatchResumeToken is supported ' );
117
-
118
- $ operation = new Watch (
119
- $ this ->manager ,
120
- $ this ->getDatabaseName (),
121
- $ this ->getCollectionName (),
122
- [],
123
- $ options + $ this ->defaultOptions ,
124
- );
125
- $ changeStream = $ operation ->execute ($ this ->getPrimaryServer ());
126
-
127
- $ changeStream ->rewind ();
128
- $ this ->assertFalse ($ changeStream ->valid ());
129
- $ this ->assertNull ($ changeStream ->getResumeToken ());
130
-
131
- $ this ->insertDocument (['x ' => 1 ]);
132
- $ this ->insertDocument (['x ' => 2 ]);
133
-
134
- $ this ->advanceCursorUntilValid ($ changeStream );
135
- $ this ->assertSameDocument ($ getIdentifier ($ changeStream ->current ()), $ changeStream ->getResumeToken ());
136
-
137
- $ changeStream ->next ();
138
- $ this ->assertTrue ($ changeStream ->valid ());
139
- $ this ->assertSameDocument ($ getIdentifier ($ changeStream ->current ()), $ changeStream ->getResumeToken ());
140
-
141
- $ this ->insertDocument (['x ' => 3 ]);
142
-
143
- $ this ->advanceCursorUntilValid ($ changeStream );
144
- $ this ->assertSameDocument ($ getIdentifier ($ changeStream ->current ()), $ changeStream ->getResumeToken ());
145
- }
146
-
147
108
/**
148
109
* Prose test 1: "ChangeStream must continuously track the last seen
149
110
* resumeToken"
@@ -165,8 +126,6 @@ public function testGetResumeToken(array $options, Closure $getIdentifier): void
165
126
#[DataProvider('provideCodecOptions ' )]
166
127
public function testGetResumeTokenWithPostBatchResumeToken (array $ options , Closure $ getIdentifier ): void
167
128
{
168
- $ this ->skipIfServerVersion ('< ' , '4.0.7 ' , 'postBatchResumeToken is not supported ' );
169
-
170
129
$ operation = new Watch (
171
130
$ this ->manager ,
172
131
$ this ->getDatabaseName (),
@@ -267,8 +226,6 @@ function (array $event) use (&$commands): void {
267
226
268
227
public function testResumeBeforeReceivingAnyResultsIncludesPostBatchResumeToken (): void
269
228
{
270
- $ this ->skipIfServerVersion ('< ' , '4.0.7 ' , 'postBatchResumeToken is not supported ' );
271
-
272
229
$ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), [], $ this ->defaultOptions );
273
230
274
231
$ events = [];
@@ -330,79 +287,6 @@ private function assertResumeAfter($expectedResumeToken, stdClass $command): voi
330
287
$ this ->assertEquals ($ expectedResumeToken , $ command ->pipeline [0 ]->{'$changeStream ' }->resumeAfter );
331
288
}
332
289
333
- /**
334
- * Prose test 9: "$changeStream stage for ChangeStream against a server
335
- * >=4.0 and <4.0.7 that has not received any results yet MUST include a
336
- * startAtOperationTime option when resuming a changestream."
337
- */
338
- public function testResumeBeforeReceivingAnyResultsIncludesStartAtOperationTime (): void
339
- {
340
- $ this ->skipIfServerVersion ('>= ' , '4.0.7 ' , 'postBatchResumeToken takes precedence over startAtOperationTime ' );
341
-
342
- $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), [], $ this ->defaultOptions );
343
-
344
- $ events = [];
345
-
346
- (new CommandObserver ())->observe (
347
- function () use ($ operation , &$ changeStream ): void {
348
- $ changeStream = $ operation ->execute ($ this ->getPrimaryServer ());
349
- },
350
- function (array $ event ) use (&$ events ): void {
351
- $ events [] = $ event ;
352
- },
353
- );
354
-
355
- $ this ->assertCount (1 , $ events );
356
- $ this ->assertSame ('aggregate ' , $ events [0 ]['started ' ]->getCommandName ());
357
- $ reply = $ events [0 ]['succeeded ' ]->getReply ();
358
- $ this ->assertObjectHasProperty ('operationTime ' , $ reply );
359
- $ operationTime = $ reply ->operationTime ;
360
- $ this ->assertInstanceOf (TimestampInterface::class, $ operationTime );
361
-
362
- $ this ->assertFalse ($ changeStream ->valid ());
363
- $ this ->forceChangeStreamResume ();
364
-
365
- $ this ->assertNoCommandExecuted (function () use ($ changeStream ): void {
366
- $ changeStream ->rewind ();
367
- });
368
-
369
- $ events = [];
370
-
371
- (new CommandObserver ())->observe (
372
- function () use ($ changeStream ): void {
373
- $ changeStream ->next ();
374
- },
375
- function (array $ event ) use (&$ events ): void {
376
- $ events [] = $ event ;
377
- },
378
- );
379
-
380
- $ this ->assertCount (3 , $ events );
381
-
382
- $ this ->assertSame ('getMore ' , $ events [0 ]['started ' ]->getCommandName ());
383
- $ this ->assertArrayHasKey ('failed ' , $ events [0 ]);
384
-
385
- $ this ->assertSame ('aggregate ' , $ events [1 ]['started ' ]->getCommandName ());
386
- $ this ->assertStartAtOperationTime ($ operationTime , $ events [1 ]['started ' ]->getCommand ());
387
- $ this ->assertArrayHasKey ('succeeded ' , $ events [1 ]);
388
-
389
- // Original cursor is freed immediately after the change stream resumes
390
- $ this ->assertSame ('killCursors ' , $ events [2 ]['started ' ]->getCommandName ());
391
- $ this ->assertArrayHasKey ('succeeded ' , $ events [2 ]);
392
-
393
- $ this ->assertFalse ($ changeStream ->valid ());
394
- }
395
-
396
- private function assertStartAtOperationTime (TimestampInterface $ expectedOperationTime , stdClass $ command ): void
397
- {
398
- $ this ->assertObjectHasProperty ('pipeline ' , $ command );
399
- $ this ->assertIsArray ($ command ->pipeline );
400
- $ this ->assertArrayHasKey (0 , $ command ->pipeline );
401
- $ this ->assertObjectHasProperty ('$changeStream ' , $ command ->pipeline [0 ]);
402
- $ this ->assertObjectHasProperty ('startAtOperationTime ' , $ command ->pipeline [0 ]->{'$changeStream ' });
403
- $ this ->assertEquals ($ expectedOperationTime , $ command ->pipeline [0 ]->{'$changeStream ' }->startAtOperationTime );
404
- }
405
-
406
290
public function testRewindMultipleTimesWithResults (): void
407
291
{
408
292
$ this ->skipIfIsShardedCluster ('Cursor needs to be advanced multiple times and can \'t be rewound afterwards. ' );
@@ -1319,8 +1203,6 @@ function (array $aggregateCommand) {
1319
1203
*/
1320
1204
public function testErrorDuringAggregateCommandDoesNotCauseResume (): void
1321
1205
{
1322
- $ this ->skipIfServerVersion ('< ' , '4.0.0 ' , 'failCommand is not supported ' );
1323
-
1324
1206
$ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), [], $ this ->defaultOptions );
1325
1207
1326
1208
$ commandCount = 0 ;
@@ -1382,39 +1264,6 @@ public function testOriginalReadPreferenceIsPreservedOnResume(): void
1382
1264
self ::assertTrue ($ cursor ->getServer ()->isSecondary ());
1383
1265
}
1384
1266
1385
- /**
1386
- * Prose test 12
1387
- * For a ChangeStream under these conditions:
1388
- * - Running against a server <4.0.7.
1389
- * - The batch is empty or has been iterated to the last document.
1390
- * Expected result:
1391
- * - getResumeToken must return the _id of the last document returned if one exists.
1392
- * - getResumeToken must return resumeAfter from the initial aggregate if the option was specified.
1393
- * - If resumeAfter was not specified, the getResumeToken result must be empty.
1394
- */
1395
- public function testGetResumeTokenReturnsOriginalResumeTokenOnEmptyBatch (): void
1396
- {
1397
- $ this ->skipIfServerVersion ('>= ' , '4.0.7 ' , 'postBatchResumeToken is supported ' );
1398
-
1399
- $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), [], $ this ->defaultOptions );
1400
- $ changeStream = $ operation ->execute ($ this ->getPrimaryServer ());
1401
-
1402
- $ this ->assertNull ($ changeStream ->getResumeToken ());
1403
-
1404
- $ this ->insertDocument (['x ' => 1 ]);
1405
-
1406
- $ changeStream ->next ();
1407
- $ this ->assertTrue ($ changeStream ->valid ());
1408
- $ resumeToken = $ changeStream ->getResumeToken ();
1409
- $ this ->assertSame ($ resumeToken , $ changeStream ->current ()->_id );
1410
-
1411
- $ options = ['resumeAfter ' => $ resumeToken ] + $ this ->defaultOptions ;
1412
- $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), [], $ options );
1413
- $ changeStream = $ operation ->execute ($ this ->getPrimaryServer ());
1414
-
1415
- $ this ->assertSame ($ resumeToken , $ changeStream ->getResumeToken ());
1416
- }
1417
-
1418
1267
/**
1419
1268
* Prose test 14
1420
1269
* For a ChangeStream under these conditions:
@@ -1429,7 +1278,6 @@ public function testGetResumeTokenReturnsOriginalResumeTokenOnEmptyBatch(): void
1429
1278
#[DataProvider('provideCodecOptions ' )]
1430
1279
public function testResumeTokenBehaviour (array $ options , Closure $ getIdentifier ): void
1431
1280
{
1432
- $ this ->skipIfServerVersion ('< ' , '4.1.1 ' , 'Testing resumeAfter and startAfter can only be tested on servers >= 4.1.1 ' );
1433
1281
$ this ->skipIfIsShardedCluster ('Resume token behaviour can \'t be reliably tested on sharded clusters. ' );
1434
1282
1435
1283
$ operation = new Watch (
0 commit comments