8
8
use MongoDB \Driver \Server ;
9
9
use MongoDB \Driver \Exception \ConnectionTimeoutException ;
10
10
use MongoDB \Exception \ResumeTokenException ;
11
+ use MongoDB \Operation \CreateCollection ;
11
12
use MongoDB \Operation \DatabaseCommand ;
13
+ use MongoDB \Operation \DropCollection ;
12
14
use MongoDB \Operation \InsertOne ;
13
15
use MongoDB \Operation \Watch ;
14
16
use MongoDB \Tests \CommandObserver ;
@@ -627,16 +629,10 @@ function ($changeStream) use (&$sessionAfterResume, &$commands) {
627
629
);
628
630
629
631
$ expectedCommands = [
630
- /* The initial aggregate command for change streams returns a cursor
631
- * envelope with an empty initial batch, since there are no changes
632
- * to report at the moment the change stream is created. Therefore,
633
- * we expect a getMore to be issued when we first advance the change
634
- * stream (with either rewind() or next()). */
632
+ /* We expect a getMore to be issued because we are calling next(). */
635
633
'getMore ' ,
636
- /* Since socketTimeoutMS is less than maxAwaitTimeMS, the previous
637
- * getMore command encounters a client socket timeout and leaves the
638
- * cursor open on the server. ChangeStream should catch this error
639
- * and resume by issuing a new aggregate command. */
634
+ /* Since we have killed the cursor, ChangeStream will resume by
635
+ * issuing a new aggregate commmand. */
640
636
'aggregate ' ,
641
637
/* When ChangeStream resumes, it overwrites its original cursor with
642
638
* the new cursor resulting from the last aggregate command. This
@@ -653,7 +649,29 @@ function ($changeStream) use (&$sessionAfterResume, &$commands) {
653
649
foreach ($ sessionAfterResume as $ session ) {
654
650
$ this ->assertEquals ($ session , $ originalSession );
655
651
}
652
+ }
653
+
654
+ public function testSessionFreed ()
655
+ {
656
+ $ operation = new CreateCollection ($ this ->getDatabaseName (), $ this ->getCollectionName ());
657
+ $ operation ->execute ($ this ->getPrimaryServer ());
658
+
659
+ $ operation = new Watch ($ this ->manager , $ this ->getDatabaseName (), $ this ->getCollectionName (), [], $ this ->defaultOptions );
660
+ $ changeStream = $ operation ->execute ($ this ->getPrimaryServer ());
661
+
662
+ $ rc = new ReflectionClass ($ changeStream );
663
+ $ rp = $ rc ->getProperty ('resumeCallable ' );
664
+ $ rp ->setAccessible (true );
665
+
666
+ $ this ->assertNotNull ($ rp ->getValue ($ changeStream ));
667
+
668
+ // Invalidate the cursor to verify that resumeCallable is unset when the cursor is exhausted.
669
+ $ operation = new DropCollection ($ this ->getDatabaseName (), $ this ->getCollectionName ());
670
+ $ operation ->execute ($ this ->getPrimaryServer ());
671
+
672
+ $ changeStream ->next ();
656
673
674
+ $ this ->assertNull ($ rp ->getValue ($ changeStream ));
657
675
}
658
676
659
677
private function insertDocument ($ document )
0 commit comments