Search moodle.org's
Developer Documentation

See Release Notes
Long Term Support Release

  • Bug fixes for general core bugs in 4.1.x will end 13 November 2023 (12 months).
  • Bug fixes for security issues in 4.1.x will end 10 November 2025 (36 months).
  • PHP version: minimum PHP 7.4.0 Note: minimum PHP version has increased since Moodle 4.0. PHP 8.0.x is supported too.

Differences Between: [Versions 310 and 401] [Versions 311 and 401] [Versions 39 and 401] [Versions 400 and 401]

   1  <?php
   2  /*
   3   * Copyright 2017-present MongoDB, Inc.
   4   *
   5   * Licensed under the Apache License, Version 2.0 (the "License");
   6   * you may not use this file except in compliance with the License.
   7   * You may obtain a copy of the License at
   8   *
   9   *   https://www.apache.org/licenses/LICENSE-2.0
  10   *
  11   * Unless required by applicable law or agreed to in writing, software
  12   * distributed under the License is distributed on an "AS IS" BASIS,
  13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14   * See the License for the specific language governing permissions and
  15   * limitations under the License.
  16   */
  17  
  18  namespace MongoDB;
  19  
  20  use Iterator;
  21  use MongoDB\Driver\CursorId;
  22  use MongoDB\Driver\Exception\ConnectionException;
  23  use MongoDB\Driver\Exception\RuntimeException;
  24  use MongoDB\Driver\Exception\ServerException;
  25  use MongoDB\Exception\BadMethodCallException;
  26  use MongoDB\Exception\ResumeTokenException;
  27  use MongoDB\Model\ChangeStreamIterator;
  28  use ReturnTypeWillChange;
  29  
  30  use function call_user_func;
  31  use function in_array;
  32  
  33  /**
  34   * Iterator for a change stream.
  35   *
  36   * @psalm-type ResumeCallable = callable(array|object|null, bool): ChangeStreamIterator
  37   *
  38   * @api
  39   * @see \MongoDB\Collection::watch()
  40   * @see https://mongodb.com/docs/manual/reference/method/db.watch/#mongodb-method-db.watch
  41   */
  42  class ChangeStream implements Iterator
  43  {
  44      /**
  45       * @deprecated 1.4
  46       * @todo Remove this in 2.0 (see: PHPLIB-360)
  47       */
  48      public const CURSOR_NOT_FOUND = 43;
  49  
  50      /** @var int */
  51      private static $cursorNotFound = 43;
  52  
  53      /** @var int[] */
  54      private static $resumableErrorCodes = [
  55          6, // HostUnreachable
  56          7, // HostNotFound
  57          89, // NetworkTimeout
  58          91, // ShutdownInProgress
  59          189, // PrimarySteppedDown
  60          262, // ExceededTimeLimit
  61          9001, // SocketException
  62          10107, // NotPrimary
  63          11600, // InterruptedAtShutdown
  64          11602, // InterruptedDueToReplStateChange
  65          13435, // NotPrimaryNoSecondaryOk
  66          13436, // NotPrimaryOrSecondary
  67          63, // StaleShardVersion
  68          150, // StaleEpoch
  69          13388, // StaleConfig
  70          234, // RetryChangeStream
  71          133, // FailedToSatisfyReadPreference
  72      ];
  73  
  74      /** @var int */
  75      private static $wireVersionForResumableChangeStreamError = 9;
  76  
  77      /** @var ResumeCallable|null */
  78      private $resumeCallable;
  79  
  80      /** @var ChangeStreamIterator */
  81      private $iterator;
  82  
  83      /** @var integer */
  84      private $key = 0;
  85  
  86      /**
  87       * Whether the change stream has advanced to its first result. This is used
  88       * to determine whether $key should be incremented after an iteration event.
  89       *
  90       * @var boolean
  91       */
  92      private $hasAdvanced = false;
  93  
  94      /**
  95       * @internal
  96       *
  97       * @param ResumeCallable $resumeCallable
  98       */
  99      public function __construct(ChangeStreamIterator $iterator, callable $resumeCallable)
 100      {
 101          $this->iterator = $iterator;
 102          $this->resumeCallable = $resumeCallable;
 103      }
 104  
 105      /**
 106       * @see https://php.net/iterator.current
 107       * @return mixed
 108       */
 109      #[ReturnTypeWillChange]
 110      public function current()
 111      {
 112          return $this->iterator->current();
 113      }
 114  
 115      /**
 116       * @return CursorId
 117       */
 118      public function getCursorId()
 119      {
 120          return $this->iterator->getInnerIterator()->getId();
 121      }
 122  
 123      /**
 124       * Returns the resume token for the iterator's current position.
 125       *
 126       * Null may be returned if no change documents have been iterated and the
 127       * server did not include a postBatchResumeToken in its aggregate or getMore
 128       * command response.
 129       *
 130       * @return array|object|null
 131       */
 132      public function getResumeToken()
 133      {
 134          return $this->iterator->getResumeToken();
 135      }
 136  
 137      /**
 138       * @see https://php.net/iterator.key
 139       * @return mixed
 140       */
 141      #[ReturnTypeWillChange]
 142      public function key()
 143      {
 144          if ($this->valid()) {
 145              return $this->key;
 146          }
 147  
 148          return null;
 149      }
 150  
 151      /**
 152       * @see https://php.net/iterator.next
 153       * @return void
 154       * @throws ResumeTokenException
 155       */
 156      #[ReturnTypeWillChange]
 157      public function next()
 158      {
 159          try {
 160              $this->iterator->next();
 161              $this->onIteration($this->hasAdvanced);
 162          } catch (RuntimeException $e) {
 163              $this->resumeOrThrow($e);
 164          }
 165      }
 166  
 167      /**
 168       * @see https://php.net/iterator.rewind
 169       * @return void
 170       * @throws ResumeTokenException
 171       */
 172      #[ReturnTypeWillChange]
 173      public function rewind()
 174      {
 175          try {
 176              $this->iterator->rewind();
 177              /* Unlike next() and resume(), the decision to increment the key
 178               * does not depend on whether the change stream has advanced. This
 179               * ensures that multiple calls to rewind() do not alter state. */
 180              $this->onIteration(false);
 181          } catch (RuntimeException $e) {
 182              $this->resumeOrThrow($e);
 183          }
 184      }
 185  
 186      /**
 187       * @see https://php.net/iterator.valid
 188       * @return boolean
 189       */
 190      #[ReturnTypeWillChange]
 191      public function valid()
 192      {
 193          return $this->iterator->valid();
 194      }
 195  
 196      /**
 197       * Determines if an exception is a resumable error.
 198       *
 199       * @see https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.rst#resumable-error
 200       */
 201      private function isResumableError(RuntimeException $exception): bool
 202      {
 203          if ($exception instanceof ConnectionException) {
 204              return true;
 205          }
 206  
 207          if (! $exception instanceof ServerException) {
 208              return false;
 209          }
 210  
 211          if ($exception->getCode() === self::$cursorNotFound) {
 212              return true;
 213          }
 214  
 215          if (server_supports_feature($this->iterator->getServer(), self::$wireVersionForResumableChangeStreamError)) {
 216              return $exception->hasErrorLabel('ResumableChangeStreamError');
 217          }
 218  
 219          return in_array($exception->getCode(), self::$resumableErrorCodes);
 220      }
 221  
 222      /**
 223       * Perform housekeeping after an iteration event.
 224       *
 225       * @param boolean $incrementKey Increment $key if there is a current result
 226       * @throws ResumeTokenException
 227       */
 228      private function onIteration(bool $incrementKey): void
 229      {
 230          /* If the cursorId is 0, the server has invalidated the cursor and we
 231           * will never perform another getMore nor need to resume since any
 232           * remaining results (up to and including the invalidate event) will
 233           * have been received in the last response. Therefore, we can unset the
 234           * resumeCallable. This will free any reference to Watch as well as the
 235           * only reference to any implicit session created therein. */
 236          if ((string) $this->getCursorId() === '0') {
 237              $this->resumeCallable = null;
 238          }
 239  
 240          /* Return early if there is not a current result. Avoid any attempt to
 241           * increment the iterator's key. */
 242          if (! $this->valid()) {
 243              return;
 244          }
 245  
 246          if ($incrementKey) {
 247              $this->key++;
 248          }
 249  
 250          $this->hasAdvanced = true;
 251      }
 252  
 253      /**
 254       * Recreates the ChangeStreamIterator after a resumable server error.
 255       */
 256      private function resume(): void
 257      {
 258          if (! $this->resumeCallable) {
 259              throw new BadMethodCallException('Cannot resume a closed change stream.');
 260          }
 261  
 262          $this->iterator = call_user_func($this->resumeCallable, $this->getResumeToken(), $this->hasAdvanced);
 263  
 264          $this->iterator->rewind();
 265  
 266          $this->onIteration($this->hasAdvanced);
 267      }
 268  
 269      /**
 270       * Either resumes after a resumable error or re-throws the exception.
 271       *
 272       * @throws RuntimeException
 273       */
 274      private function resumeOrThrow(RuntimeException $exception): void
 275      {
 276          if ($this->isResumableError($exception)) {
 277              $this->resume();
 278  
 279              return;
 280          }
 281  
 282          throw $exception;
 283      }
 284  }