Search moodle.org's
Developer Documentation

See Release Notes

  • Bug fixes for general core bugs in 4.0.x will end 8 May 2023 (12 months).
  • Bug fixes for security issues in 4.0.x will end 13 November 2023 (18 months).
  • PHP version: minimum PHP 7.3.0 Note: the minimum PHP version has increased since Moodle 3.10. PHP 7.4.x is also supported.

Differences Between: [Versions 310 and 400] [Versions 39 and 400] [Versions 400 and 401]

   1  <?php
   2  /*
   3   * Copyright 2019 MongoDB, Inc.
   4   *
   5   * Licensed under the Apache License, Version 2.0 (the "License");
   6   * you may not use this file except in compliance with the License.
   7   * You may obtain a copy of the License at
   8   *
   9   *   http://www.apache.org/licenses/LICENSE-2.0
  10   *
  11   * Unless required by applicable law or agreed to in writing, software
  12   * distributed under the License is distributed on an "AS IS" BASIS,
  13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14   * See the License for the specific language governing permissions and
  15   * limitations under the License.
  16   */
  17  
  18  namespace MongoDB\Model;
  19  
  20  use IteratorIterator;
  21  use MongoDB\BSON\Serializable;
  22  use MongoDB\Driver\Cursor;
  23  use MongoDB\Driver\Monitoring\CommandFailedEvent;
  24  use MongoDB\Driver\Monitoring\CommandStartedEvent;
  25  use MongoDB\Driver\Monitoring\CommandSubscriber;
  26  use MongoDB\Driver\Monitoring\CommandSucceededEvent;
  27  use MongoDB\Driver\Server;
  28  use MongoDB\Exception\InvalidArgumentException;
  29  use MongoDB\Exception\ResumeTokenException;
  30  use MongoDB\Exception\UnexpectedValueException;
  31  use function count;
  32  use function is_array;
  33  use function is_integer;
  34  use function is_object;
  35  use function MongoDB\Driver\Monitoring\addSubscriber;
  36  use function MongoDB\Driver\Monitoring\removeSubscriber;
  37  
  38  /**
  39   * ChangeStreamIterator wraps a change stream's tailable cursor.
  40   *
  41   * This iterator tracks the size of each batch in order to determine when the
  42   * postBatchResumeToken is applicable. It also ensures that initial calls to
  43   * rewind() do not execute getMore commands.
  44   *
  45   * @internal
  46   */
  47  class ChangeStreamIterator extends IteratorIterator implements CommandSubscriber
  48  {
  49      /** @var integer */
  50      private $batchPosition = 0;
  51  
  52      /** @var integer */
  53      private $batchSize;
  54  
  55      /** @var boolean */
  56      private $isRewindNop;
  57  
  58      /** @var boolean */
  59      private $isValid = false;
  60  
  61      /** @var object|null */
  62      private $postBatchResumeToken;
  63  
  64      /** @var array|object|null */
  65      private $resumeToken;
  66  
  67      /** @var Server */
  68      private $server;
  69  
  70      /**
  71       * @internal
  72       * @param Cursor            $cursor
  73       * @param integer           $firstBatchSize
  74       * @param array|object|null $initialResumeToken
  75       * @param object|null       $postBatchResumeToken
  76       */
  77      public function __construct(Cursor $cursor, $firstBatchSize, $initialResumeToken, $postBatchResumeToken)
  78      {
  79          if (! is_integer($firstBatchSize)) {
  80              throw InvalidArgumentException::invalidType('$firstBatchSize', $firstBatchSize, 'integer');
  81          }
  82  
  83          if (isset($initialResumeToken) && ! is_array($initialResumeToken) && ! is_object($initialResumeToken)) {
  84              throw InvalidArgumentException::invalidType('$initialResumeToken', $initialResumeToken, 'array or object');
  85          }
  86  
  87          if (isset($postBatchResumeToken) && ! is_object($postBatchResumeToken)) {
  88              throw InvalidArgumentException::invalidType('$postBatchResumeToken', $postBatchResumeToken, 'object');
  89          }
  90  
  91          parent::__construct($cursor);
  92  
  93          $this->batchSize = $firstBatchSize;
  94          $this->isRewindNop = ($firstBatchSize === 0);
  95          $this->postBatchResumeToken = $postBatchResumeToken;
  96          $this->resumeToken = $initialResumeToken;
  97          $this->server = $cursor->getServer();
  98      }
  99  
 100      /** @internal */
 101      final public function commandFailed(CommandFailedEvent $event)
 102      {
 103      }
 104  
 105      /** @internal */
 106      final public function commandStarted(CommandStartedEvent $event)
 107      {
 108          if ($event->getCommandName() !== 'getMore') {
 109              return;
 110          }
 111  
 112          $this->batchPosition = 0;
 113          $this->batchSize = null;
 114          $this->postBatchResumeToken = null;
 115      }
 116  
 117      /** @internal */
 118      final public function commandSucceeded(CommandSucceededEvent $event)
 119      {
 120          if ($event->getCommandName() !== 'getMore') {
 121              return;
 122          }
 123  
 124          $reply = $event->getReply();
 125  
 126          if (! isset($reply->cursor->nextBatch) || ! is_array($reply->cursor->nextBatch)) {
 127              throw new UnexpectedValueException('getMore command did not return a "cursor.nextBatch" array');
 128          }
 129  
 130          $this->batchSize = count($reply->cursor->nextBatch);
 131  
 132          if (isset($reply->cursor->postBatchResumeToken) && is_object($reply->cursor->postBatchResumeToken)) {
 133              $this->postBatchResumeToken = $reply->cursor->postBatchResumeToken;
 134          }
 135      }
 136  
 137      /**
 138       * @see https://php.net/iteratoriterator.current
 139       * @return mixed
 140       */
 141      public function current()
 142      {
 143          return $this->isValid ? parent::current() : null;
 144      }
 145  
 146      /**
 147       * Returns the resume token for the iterator's current position.
 148       *
 149       * Null may be returned if no change documents have been iterated and the
 150       * server did not include a postBatchResumeToken in its aggregate or getMore
 151       * command response.
 152       *
 153       * @return array|object|null
 154       */
 155      public function getResumeToken()
 156      {
 157          return $this->resumeToken;
 158      }
 159  
 160      /**
 161       * Returns the server the cursor is running on.
 162       */
 163      public function getServer() : Server
 164      {
 165          return $this->server;
 166      }
 167  
 168      /**
 169       * @see https://php.net/iteratoriterator.key
 170       * @return mixed
 171       */
 172      public function key()
 173      {
 174          return $this->isValid ? parent::key() : null;
 175      }
 176  
 177      /**
 178       * @see https://php.net/iteratoriterator.rewind
 179       * @return void
 180       */
 181      public function next()
 182      {
 183          /* Determine if advancing the iterator will execute a getMore command
 184           * (i.e. we are already positioned at the end of the current batch). If
 185           * so, rely on the APM callbacks to reset $batchPosition and update
 186           * $batchSize. Otherwise, we can forgo APM and manually increment
 187           * $batchPosition after calling next(). */
 188          $getMore = $this->isAtEndOfBatch();
 189  
 190          if ($getMore) {
 191              addSubscriber($this);
 192          }
 193  
 194          try {
 195              parent::next();
 196              $this->onIteration(! $getMore);
 197          } finally {
 198              if ($getMore) {
 199                  removeSubscriber($this);
 200              }
 201          }
 202      }
 203  
 204      /**
 205       * @see https://php.net/iteratoriterator.rewind
 206       * @return void
 207       */
 208      public function rewind()
 209      {
 210          if ($this->isRewindNop) {
 211              return;
 212          }
 213  
 214          parent::rewind();
 215          $this->onIteration(false);
 216      }
 217  
 218      /**
 219       * @see https://php.net/iteratoriterator.valid
 220       * @return boolean
 221       */
 222      public function valid()
 223      {
 224          return $this->isValid;
 225      }
 226  
 227      /**
 228       * Extracts the resume token (i.e. "_id" field) from a change document.
 229       *
 230       * @param array|object $document Change document
 231       * @return array|object
 232       * @throws InvalidArgumentException
 233       * @throws ResumeTokenException if the resume token is not found or invalid
 234       */
 235      private function extractResumeToken($document)
 236      {
 237          if (! is_array($document) && ! is_object($document)) {
 238              throw InvalidArgumentException::invalidType('$document', $document, 'array or object');
 239          }
 240  
 241          if ($document instanceof Serializable) {
 242              return $this->extractResumeToken($document->bsonSerialize());
 243          }
 244  
 245          $resumeToken = is_array($document)
 246              ? ($document['_id'] ?? null)
 247              : ($document->_id ?? null);
 248  
 249          if (! isset($resumeToken)) {
 250              $this->isValid = false;
 251              throw ResumeTokenException::notFound();
 252          }
 253  
 254          if (! is_array($resumeToken) && ! is_object($resumeToken)) {
 255              $this->isValid = false;
 256              throw ResumeTokenException::invalidType($resumeToken);
 257          }
 258  
 259          return $resumeToken;
 260      }
 261  
 262      /**
 263       * Return whether the iterator is positioned at the end of the batch.
 264       *
 265       * @return boolean
 266       */
 267      private function isAtEndOfBatch()
 268      {
 269          return $this->batchPosition + 1 >= $this->batchSize;
 270      }
 271  
 272      /**
 273       * Perform housekeeping after an iteration event.
 274       *
 275       * @see https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.rst#updating-the-cached-resume-token
 276       * @param boolean $incrementBatchPosition
 277       */
 278      private function onIteration($incrementBatchPosition)
 279      {
 280          $this->isValid = parent::valid();
 281  
 282          /* Disable rewind()'s NOP behavior once we advance to a valid position.
 283           * This will allow the driver to throw a LogicException if rewind() is
 284           * called after the cursor has advanced past its first element. */
 285          if ($this->isRewindNop && $this->isValid) {
 286              $this->isRewindNop = false;
 287          }
 288  
 289          if ($incrementBatchPosition && $this->isValid) {
 290              $this->batchPosition++;
 291          }
 292  
 293          /* If the iterator is positioned at the end of the batch, apply the
 294           * postBatchResumeToken if it's available. This handles both the case
 295           * where the current batch is empty (since onIteration() will be called
 296           * after a successful getMore) and when the iterator has advanced to the
 297           * last document in its current batch. Otherwise, extract a resume token
 298           * from the current document if possible. */
 299          if ($this->isAtEndOfBatch() && $this->postBatchResumeToken !== null) {
 300              $this->resumeToken = $this->postBatchResumeToken;
 301          } elseif ($this->isValid) {
 302              $this->resumeToken = $this->extractResumeToken($this->current());
 303          }
 304      }
 305  }