Search moodle.org's
Developer Documentation

See Release Notes

  • Bug fixes for general core bugs in 3.10.x will end 8 November 2021 (12 months).
  • Bug fixes for security issues in 3.10.x will end 9 May 2022 (18 months).
  • PHP version: minimum PHP 7.2.0 Note: minimum PHP version has increased since Moodle 3.8. PHP 7.3.x and 7.4.x are supported too.

Differences Between: [Versions 310 and 311] [Versions 310 and 400] [Versions 310 and 401]

   1  <?php
   2  /*
   3   * Copyright 2019 MongoDB, Inc.
   4   *
   5   * Licensed under the Apache License, Version 2.0 (the "License");
   6   * you may not use this file except in compliance with the License.
   7   * You may obtain a copy of the License at
   8   *
   9   *   http://www.apache.org/licenses/LICENSE-2.0
  10   *
  11   * Unless required by applicable law or agreed to in writing, software
  12   * distributed under the License is distributed on an "AS IS" BASIS,
  13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14   * See the License for the specific language governing permissions and
  15   * limitations under the License.
  16   */
  17  
  18  namespace MongoDB\Model;
  19  
  20  use IteratorIterator;
  21  use MongoDB\BSON\Serializable;
  22  use MongoDB\Driver\Cursor;
  23  use MongoDB\Driver\Monitoring\CommandFailedEvent;
  24  use MongoDB\Driver\Monitoring\CommandStartedEvent;
  25  use MongoDB\Driver\Monitoring\CommandSubscriber;
  26  use MongoDB\Driver\Monitoring\CommandSucceededEvent;
  27  use MongoDB\Exception\InvalidArgumentException;
  28  use MongoDB\Exception\ResumeTokenException;
  29  use MongoDB\Exception\UnexpectedValueException;
  30  use function count;
  31  use function is_array;
  32  use function is_integer;
  33  use function is_object;
  34  use function MongoDB\Driver\Monitoring\addSubscriber;
  35  use function MongoDB\Driver\Monitoring\removeSubscriber;
  36  
  37  /**
  38   * ChangeStreamIterator wraps a change stream's tailable cursor.
  39   *
  40   * This iterator tracks the size of each batch in order to determine when the
  41   * postBatchResumeToken is applicable. It also ensures that initial calls to
  42   * rewind() do not execute getMore commands.
  43   *
  44   * @internal
  45   */
  46  class ChangeStreamIterator extends IteratorIterator implements CommandSubscriber
  47  {
  48      /** @var integer */
  49      private $batchPosition = 0;
  50  
  51      /** @var integer */
  52      private $batchSize;
  53  
  54      /** @var boolean */
  55      private $isRewindNop;
  56  
  57      /** @var boolean */
  58      private $isValid = false;
  59  
  60      /** @var object|null */
  61      private $postBatchResumeToken;
  62  
  63      /** @var array|object|null */
  64      private $resumeToken;
  65  
  66      /**
  67       * @internal
  68       * @param Cursor            $cursor
  69       * @param integer           $firstBatchSize
  70       * @param array|object|null $initialResumeToken
  71       * @param object|null       $postBatchResumeToken
  72       */
  73      public function __construct(Cursor $cursor, $firstBatchSize, $initialResumeToken, $postBatchResumeToken)
  74      {
  75          if (! is_integer($firstBatchSize)) {
  76              throw InvalidArgumentException::invalidType('$firstBatchSize', $firstBatchSize, 'integer');
  77          }
  78  
  79          if (isset($initialResumeToken) && ! is_array($initialResumeToken) && ! is_object($initialResumeToken)) {
  80              throw InvalidArgumentException::invalidType('$initialResumeToken', $initialResumeToken, 'array or object');
  81          }
  82  
  83          if (isset($postBatchResumeToken) && ! is_object($postBatchResumeToken)) {
  84              throw InvalidArgumentException::invalidType('$postBatchResumeToken', $postBatchResumeToken, 'object');
  85          }
  86  
  87          parent::__construct($cursor);
  88  
  89          $this->batchSize = $firstBatchSize;
  90          $this->isRewindNop = ($firstBatchSize === 0);
  91          $this->postBatchResumeToken = $postBatchResumeToken;
  92          $this->resumeToken = $initialResumeToken;
  93      }
  94  
  95      /** @internal */
  96      final public function commandFailed(CommandFailedEvent $event)
  97      {
  98      }
  99  
 100      /** @internal */
 101      final public function commandStarted(CommandStartedEvent $event)
 102      {
 103          if ($event->getCommandName() !== 'getMore') {
 104              return;
 105          }
 106  
 107          $this->batchPosition = 0;
 108          $this->batchSize = null;
 109          $this->postBatchResumeToken = null;
 110      }
 111  
 112      /** @internal */
 113      final public function commandSucceeded(CommandSucceededEvent $event)
 114      {
 115          if ($event->getCommandName() !== 'getMore') {
 116              return;
 117          }
 118  
 119          $reply = $event->getReply();
 120  
 121          if (! isset($reply->cursor->nextBatch) || ! is_array($reply->cursor->nextBatch)) {
 122              throw new UnexpectedValueException('getMore command did not return a "cursor.nextBatch" array');
 123          }
 124  
 125          $this->batchSize = count($reply->cursor->nextBatch);
 126  
 127          if (isset($reply->cursor->postBatchResumeToken) && is_object($reply->cursor->postBatchResumeToken)) {
 128              $this->postBatchResumeToken = $reply->cursor->postBatchResumeToken;
 129          }
 130      }
 131  
 132      /**
 133       * @see https://php.net/iteratoriterator.current
 134       * @return mixed
 135       */
 136      public function current()
 137      {
 138          return $this->isValid ? parent::current() : null;
 139      }
 140  
 141      /**
 142       * Returns the resume token for the iterator's current position.
 143       *
 144       * Null may be returned if no change documents have been iterated and the
 145       * server did not include a postBatchResumeToken in its aggregate or getMore
 146       * command response.
 147       *
 148       * @return array|object|null
 149       */
 150      public function getResumeToken()
 151      {
 152          return $this->resumeToken;
 153      }
 154  
 155      /**
 156       * @see https://php.net/iteratoriterator.key
 157       * @return mixed
 158       */
 159      public function key()
 160      {
 161          return $this->isValid ? parent::key() : null;
 162      }
 163  
 164      /**
 165       * @see https://php.net/iteratoriterator.rewind
 166       * @return void
 167       */
 168      public function next()
 169      {
 170          /* Determine if advancing the iterator will execute a getMore command
 171           * (i.e. we are already positioned at the end of the current batch). If
 172           * so, rely on the APM callbacks to reset $batchPosition and update
 173           * $batchSize. Otherwise, we can forgo APM and manually increment
 174           * $batchPosition after calling next(). */
 175          $getMore = $this->isAtEndOfBatch();
 176  
 177          if ($getMore) {
 178              addSubscriber($this);
 179          }
 180  
 181          try {
 182              parent::next();
 183              $this->onIteration(! $getMore);
 184          } finally {
 185              if ($getMore) {
 186                  removeSubscriber($this);
 187              }
 188          }
 189      }
 190  
 191      /**
 192       * @see https://php.net/iteratoriterator.rewind
 193       * @return void
 194       */
 195      public function rewind()
 196      {
 197          if ($this->isRewindNop) {
 198              return;
 199          }
 200  
 201          parent::rewind();
 202          $this->onIteration(false);
 203      }
 204  
 205      /**
 206       * @see https://php.net/iteratoriterator.valid
 207       * @return boolean
 208       */
 209      public function valid()
 210      {
 211          return $this->isValid;
 212      }
 213  
 214      /**
 215       * Extracts the resume token (i.e. "_id" field) from a change document.
 216       *
 217       * @param array|object $document Change document
 218       * @return array|object
 219       * @throws InvalidArgumentException
 220       * @throws ResumeTokenException if the resume token is not found or invalid
 221       */
 222      private function extractResumeToken($document)
 223      {
 224          if (! is_array($document) && ! is_object($document)) {
 225              throw InvalidArgumentException::invalidType('$document', $document, 'array or object');
 226          }
 227  
 228          if ($document instanceof Serializable) {
 229              return $this->extractResumeToken($document->bsonSerialize());
 230          }
 231  
 232          $resumeToken = is_array($document)
 233              ? (isset($document['_id']) ? $document['_id'] : null)
 234              : (isset($document->_id) ? $document->_id : null);
 235  
 236          if (! isset($resumeToken)) {
 237              $this->isValid = false;
 238              throw ResumeTokenException::notFound();
 239          }
 240  
 241          if (! is_array($resumeToken) && ! is_object($resumeToken)) {
 242              $this->isValid = false;
 243              throw ResumeTokenException::invalidType($resumeToken);
 244          }
 245  
 246          return $resumeToken;
 247      }
 248  
 249      /**
 250       * Return whether the iterator is positioned at the end of the batch.
 251       *
 252       * @return boolean
 253       */
 254      private function isAtEndOfBatch()
 255      {
 256          return $this->batchPosition + 1 >= $this->batchSize;
 257      }
 258  
 259      /**
 260       * Perform housekeeping after an iteration event.
 261       *
 262       * @see https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.rst#updating-the-cached-resume-token
 263       * @param boolean $incrementBatchPosition
 264       */
 265      private function onIteration($incrementBatchPosition)
 266      {
 267          $this->isValid = parent::valid();
 268  
 269          /* Disable rewind()'s NOP behavior once we advance to a valid position.
 270           * This will allow the driver to throw a LogicException if rewind() is
 271           * called after the cursor has advanced past its first element. */
 272          if ($this->isRewindNop && $this->isValid) {
 273              $this->isRewindNop = false;
 274          }
 275  
 276          if ($incrementBatchPosition && $this->isValid) {
 277              $this->batchPosition++;
 278          }
 279  
 280          /* If the iterator is positioned at the end of the batch, apply the
 281           * postBatchResumeToken if it's available. This handles both the case
 282           * where the current batch is empty (since onIteration() will be called
 283           * after a successful getMore) and when the iterator has advanced to the
 284           * last document in its current batch. Otherwise, extract a resume token
 285           * from the current document if possible. */
 286          if ($this->isAtEndOfBatch() && $this->postBatchResumeToken !== null) {
 287              $this->resumeToken = $this->postBatchResumeToken;
 288          } elseif ($this->isValid) {
 289              $this->resumeToken = $this->extractResumeToken($this->current());
 290          }
 291      }
 292  }