Search moodle.org's
Developer Documentation

See Release Notes
Long Term Support Release

  • Bug fixes for general core bugs in 4.1.x will end 13 November 2023 (12 months).
  • Bug fixes for security issues in 4.1.x will end 10 November 2025 (36 months).
  • PHP version: minimum PHP 7.4.0 Note: minimum PHP version has increased since Moodle 4.0. PHP 8.0.x is supported too.

Differences Between: [Versions 310 and 401] [Versions 311 and 401] [Versions 39 and 401] [Versions 400 and 401]

   1  <?php
   2  /*
   3   * Copyright 2019-present MongoDB, Inc.
   4   *
   5   * Licensed under the Apache License, Version 2.0 (the "License");
   6   * you may not use this file except in compliance with the License.
   7   * You may obtain a copy of the License at
   8   *
   9   *   https://www.apache.org/licenses/LICENSE-2.0
  10   *
  11   * Unless required by applicable law or agreed to in writing, software
  12   * distributed under the License is distributed on an "AS IS" BASIS,
  13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14   * See the License for the specific language governing permissions and
  15   * limitations under the License.
  16   */
  17  
  18  namespace MongoDB\Model;
  19  
  20  use IteratorIterator;
  21  use MongoDB\BSON\Serializable;
  22  use MongoDB\Driver\Cursor;
  23  use MongoDB\Driver\Monitoring\CommandFailedEvent;
  24  use MongoDB\Driver\Monitoring\CommandStartedEvent;
  25  use MongoDB\Driver\Monitoring\CommandSubscriber;
  26  use MongoDB\Driver\Monitoring\CommandSucceededEvent;
  27  use MongoDB\Driver\Server;
  28  use MongoDB\Exception\InvalidArgumentException;
  29  use MongoDB\Exception\ResumeTokenException;
  30  use MongoDB\Exception\UnexpectedValueException;
  31  use ReturnTypeWillChange;
  32  
  33  use function assert;
  34  use function count;
  35  use function is_array;
  36  use function is_object;
  37  use function MongoDB\Driver\Monitoring\addSubscriber;
  38  use function MongoDB\Driver\Monitoring\removeSubscriber;
  39  
  40  /**
  41   * ChangeStreamIterator wraps a change stream's tailable cursor.
  42   *
  43   * This iterator tracks the size of each batch in order to determine when the
  44   * postBatchResumeToken is applicable. It also ensures that initial calls to
  45   * rewind() do not execute getMore commands.
  46   *
  47   * @internal
  48   */
  49  class ChangeStreamIterator extends IteratorIterator implements CommandSubscriber
  50  {
  51      /** @var integer */
  52      private $batchPosition = 0;
  53  
  54      /** @var integer */
  55      private $batchSize;
  56  
  57      /** @var boolean */
  58      private $isRewindNop;
  59  
  60      /** @var boolean */
  61      private $isValid = false;
  62  
  63      /** @var object|null */
  64      private $postBatchResumeToken;
  65  
  66      /** @var array|object|null */
  67      private $resumeToken;
  68  
  69      /** @var Server */
  70      private $server;
  71  
  72      /**
  73       * @internal
  74       * @param array|object|null $initialResumeToken
  75       */
  76      public function __construct(Cursor $cursor, int $firstBatchSize, $initialResumeToken, ?object $postBatchResumeToken)
  77      {
  78          if (isset($initialResumeToken) && ! is_array($initialResumeToken) && ! is_object($initialResumeToken)) {
  79              throw InvalidArgumentException::invalidType('$initialResumeToken', $initialResumeToken, 'array or object');
  80          }
  81  
  82          parent::__construct($cursor);
  83  
  84          $this->batchSize = $firstBatchSize;
  85          $this->isRewindNop = ($firstBatchSize === 0);
  86          $this->postBatchResumeToken = $postBatchResumeToken;
  87          $this->resumeToken = $initialResumeToken;
  88          $this->server = $cursor->getServer();
  89      }
  90  
  91      /** @internal */
  92      final public function commandFailed(CommandFailedEvent $event): void
  93      {
  94      }
  95  
  96      /** @internal */
  97      final public function commandStarted(CommandStartedEvent $event): void
  98      {
  99          if ($event->getCommandName() !== 'getMore') {
 100              return;
 101          }
 102  
 103          $this->batchPosition = 0;
 104          $this->batchSize = 0;
 105          $this->postBatchResumeToken = null;
 106      }
 107  
 108      /** @internal */
 109      final public function commandSucceeded(CommandSucceededEvent $event): void
 110      {
 111          if ($event->getCommandName() !== 'getMore') {
 112              return;
 113          }
 114  
 115          $reply = $event->getReply();
 116  
 117          if (! isset($reply->cursor->nextBatch) || ! is_array($reply->cursor->nextBatch)) {
 118              throw new UnexpectedValueException('getMore command did not return a "cursor.nextBatch" array');
 119          }
 120  
 121          $this->batchSize = count($reply->cursor->nextBatch);
 122  
 123          if (isset($reply->cursor->postBatchResumeToken) && is_object($reply->cursor->postBatchResumeToken)) {
 124              $this->postBatchResumeToken = $reply->cursor->postBatchResumeToken;
 125          }
 126      }
 127  
 128      /**
 129       * @see https://php.net/iteratoriterator.current
 130       * @return mixed
 131       */
 132      #[ReturnTypeWillChange]
 133      public function current()
 134      {
 135          return $this->isValid ? parent::current() : null;
 136      }
 137  
 138      /**
 139       * Necessary to let psalm know that we're always expecting a cursor as inner
 140       * iterator. This could be side-stepped due to the class not being final,
 141       * but it's very much an invalid use-case. This method can be dropped in 2.0
 142       * once the class is final.
 143       */
 144      final public function getInnerIterator(): Cursor
 145      {
 146          $cursor = parent::getInnerIterator();
 147          assert($cursor instanceof Cursor);
 148  
 149          return $cursor;
 150      }
 151  
 152      /**
 153       * Returns the resume token for the iterator's current position.
 154       *
 155       * Null may be returned if no change documents have been iterated and the
 156       * server did not include a postBatchResumeToken in its aggregate or getMore
 157       * command response.
 158       *
 159       * @return array|object|null
 160       */
 161      public function getResumeToken()
 162      {
 163          return $this->resumeToken;
 164      }
 165  
 166      /**
 167       * Returns the server the cursor is running on.
 168       */
 169      public function getServer(): Server
 170      {
 171          return $this->server;
 172      }
 173  
 174      /**
 175       * @see https://php.net/iteratoriterator.key
 176       * @return mixed
 177       */
 178      #[ReturnTypeWillChange]
 179      public function key()
 180      {
 181          return $this->isValid ? parent::key() : null;
 182      }
 183  
 184      /**
 185       * @see https://php.net/iteratoriterator.rewind
 186       */
 187      public function next(): void
 188      {
 189          /* Determine if advancing the iterator will execute a getMore command
 190           * (i.e. we are already positioned at the end of the current batch). If
 191           * so, rely on the APM callbacks to reset $batchPosition and update
 192           * $batchSize. Otherwise, we can forgo APM and manually increment
 193           * $batchPosition after calling next(). */
 194          $getMore = $this->isAtEndOfBatch();
 195  
 196          if ($getMore) {
 197              addSubscriber($this);
 198          }
 199  
 200          try {
 201              parent::next();
 202              $this->onIteration(! $getMore);
 203          } finally {
 204              if ($getMore) {
 205                  removeSubscriber($this);
 206              }
 207          }
 208      }
 209  
 210      /**
 211       * @see https://php.net/iteratoriterator.rewind
 212       */
 213      public function rewind(): void
 214      {
 215          if ($this->isRewindNop) {
 216              return;
 217          }
 218  
 219          parent::rewind();
 220          $this->onIteration(false);
 221      }
 222  
 223      /**
 224       * @see https://php.net/iteratoriterator.valid
 225       */
 226      public function valid(): bool
 227      {
 228          return $this->isValid;
 229      }
 230  
 231      /**
 232       * Extracts the resume token (i.e. "_id" field) from a change document.
 233       *
 234       * @param array|object $document Change document
 235       * @return array|object
 236       * @throws InvalidArgumentException
 237       * @throws ResumeTokenException if the resume token is not found or invalid
 238       */
 239      private function extractResumeToken($document)
 240      {
 241          if (! is_array($document) && ! is_object($document)) {
 242              throw InvalidArgumentException::invalidType('$document', $document, 'array or object');
 243          }
 244  
 245          if ($document instanceof Serializable) {
 246              return $this->extractResumeToken($document->bsonSerialize());
 247          }
 248  
 249          $resumeToken = is_array($document)
 250              ? ($document['_id'] ?? null)
 251              : ($document->_id ?? null);
 252  
 253          if (! isset($resumeToken)) {
 254              $this->isValid = false;
 255  
 256              throw ResumeTokenException::notFound();
 257          }
 258  
 259          if (! is_array($resumeToken) && ! is_object($resumeToken)) {
 260              $this->isValid = false;
 261  
 262              throw ResumeTokenException::invalidType($resumeToken);
 263          }
 264  
 265          return $resumeToken;
 266      }
 267  
 268      /**
 269       * Return whether the iterator is positioned at the end of the batch.
 270       */
 271      private function isAtEndOfBatch(): bool
 272      {
 273          return $this->batchPosition + 1 >= $this->batchSize;
 274      }
 275  
 276      /**
 277       * Perform housekeeping after an iteration event.
 278       *
 279       * @see https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.rst#updating-the-cached-resume-token
 280       */
 281      private function onIteration(bool $incrementBatchPosition): void
 282      {
 283          $this->isValid = parent::valid();
 284  
 285          /* Disable rewind()'s NOP behavior once we advance to a valid position.
 286           * This will allow the driver to throw a LogicException if rewind() is
 287           * called after the cursor has advanced past its first element. */
 288          if ($this->isRewindNop && $this->isValid) {
 289              $this->isRewindNop = false;
 290          }
 291  
 292          if ($incrementBatchPosition && $this->isValid) {
 293              $this->batchPosition++;
 294          }
 295  
 296          /* If the iterator is positioned at the end of the batch, apply the
 297           * postBatchResumeToken if it's available. This handles both the case
 298           * where the current batch is empty (since onIteration() will be called
 299           * after a successful getMore) and when the iterator has advanced to the
 300           * last document in its current batch. Otherwise, extract a resume token
 301           * from the current document if possible. */
 302          if ($this->isAtEndOfBatch() && $this->postBatchResumeToken !== null) {
 303              $this->resumeToken = $this->postBatchResumeToken;
 304          } elseif ($this->isValid) {
 305              $this->resumeToken = $this->extractResumeToken($this->current());
 306          }
 307      }
 308  }