Search moodle.org's
Developer Documentation

See Release Notes

  • Bug fixes for general core bugs in 4.0.x will end 8 May 2023 (12 months).
  • Bug fixes for security issues in 4.0.x will end 13 November 2023 (18 months).
  • PHP version: minimum PHP 7.3.0 Note: the minimum PHP version has increased since Moodle 3.10. PHP 7.4.x is also supported.

Differences Between: [Versions 310 and 400] [Versions 39 and 400] [Versions 400 and 401]

   1  <?php
   2  /*
   3   * Copyright 2017 MongoDB, Inc.
   4   *
   5   * Licensed under the Apache License, Version 2.0 (the "License");
   6   * you may not use this file except in compliance with the License.
   7   * You may obtain a copy of the License at
   8   *
   9   *   http://www.apache.org/licenses/LICENSE-2.0
  10   *
  11   * Unless required by applicable law or agreed to in writing, software
  12   * distributed under the License is distributed on an "AS IS" BASIS,
  13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14   * See the License for the specific language governing permissions and
  15   * limitations under the License.
  16   */
  17  
  18  namespace MongoDB;
  19  
  20  use Iterator;
  21  use MongoDB\Driver\CursorId;
  22  use MongoDB\Driver\Exception\ConnectionException;
  23  use MongoDB\Driver\Exception\RuntimeException;
  24  use MongoDB\Driver\Exception\ServerException;
  25  use MongoDB\Exception\ResumeTokenException;
  26  use MongoDB\Model\ChangeStreamIterator;
  27  use function call_user_func;
  28  use function in_array;
  29  
  30  /**
  31   * Iterator for a change stream.
  32   *
  33   * @api
  34   * @see \MongoDB\Collection::watch()
  35   * @see http://docs.mongodb.org/manual/reference/command/changeStream/
  36   */
  37  class ChangeStream implements Iterator
  38  {
  39      /**
  40       * @deprecated 1.4
  41       * @todo Remove this in 2.0 (see: PHPLIB-360)
  42       */
  43      const CURSOR_NOT_FOUND = 43;
  44  
  45      /** @var int */
  46      private static $cursorNotFound = 43;
  47  
  48      /** @var int[] */
  49      private static $resumableErrorCodes = [
  50          6, // HostUnreachable
  51          7, // HostNotFound
  52          89, // NetworkTimeout
  53          91, // ShutdownInProgress
  54          189, // PrimarySteppedDown
  55          262, // ExceededTimeLimit
  56          9001, // SocketException
  57          10107, // NotMaster
  58          11600, // InterruptedAtShutdown
  59          11602, // InterruptedDueToReplStateChange
  60          13435, // NotMasterNoSlaveOk
  61          13436, // NotMasterOrSecondary
  62          63, // StaleShardVersion
  63          150, // StaleEpoch
  64          13388, // StaleConfig
  65          234, // RetryChangeStream
  66          133, // FailedToSatisfyReadPreference
  67      ];
  68  
  69      /** @var int */
  70      private static $wireVersionForResumableChangeStreamError = 9;
  71  
  72      /** @var callable */
  73      private $resumeCallable;
  74  
  75      /** @var ChangeStreamIterator */
  76      private $iterator;
  77  
  78      /** @var integer */
  79      private $key = 0;
  80  
  81      /**
  82       * Whether the change stream has advanced to its first result. This is used
  83       * to determine whether $key should be incremented after an iteration event.
  84       *
  85       * @var boolean
  86       */
  87      private $hasAdvanced = false;
  88  
  89      /**
  90       * @internal
  91       * @param ChangeStreamIterator $iterator
  92       * @param callable             $resumeCallable
  93       */
  94      public function __construct(ChangeStreamIterator $iterator, callable $resumeCallable)
  95      {
  96          $this->iterator = $iterator;
  97          $this->resumeCallable = $resumeCallable;
  98      }
  99  
 100      /**
 101       * @see http://php.net/iterator.current
 102       * @return mixed
 103       */
 104      public function current()
 105      {
 106          return $this->iterator->current();
 107      }
 108  
 109      /**
 110       * @return CursorId
 111       */
 112      public function getCursorId()
 113      {
 114          return $this->iterator->getInnerIterator()->getId();
 115      }
 116  
 117      /**
 118       * Returns the resume token for the iterator's current position.
 119       *
 120       * Null may be returned if no change documents have been iterated and the
 121       * server did not include a postBatchResumeToken in its aggregate or getMore
 122       * command response.
 123       *
 124       * @return array|object|null
 125       */
 126      public function getResumeToken()
 127      {
 128          return $this->iterator->getResumeToken();
 129      }
 130  
 131      /**
 132       * @see http://php.net/iterator.key
 133       * @return mixed
 134       */
 135      public function key()
 136      {
 137          if ($this->valid()) {
 138              return $this->key;
 139          }
 140  
 141          return null;
 142      }
 143  
 144      /**
 145       * @see http://php.net/iterator.next
 146       * @return void
 147       * @throws ResumeTokenException
 148       */
 149      public function next()
 150      {
 151          try {
 152              $this->iterator->next();
 153              $this->onIteration($this->hasAdvanced);
 154          } catch (RuntimeException $e) {
 155              $this->resumeOrThrow($e);
 156          }
 157      }
 158  
 159      /**
 160       * @see http://php.net/iterator.rewind
 161       * @return void
 162       * @throws ResumeTokenException
 163       */
 164      public function rewind()
 165      {
 166          try {
 167              $this->iterator->rewind();
 168              /* Unlike next() and resume(), the decision to increment the key
 169               * does not depend on whether the change stream has advanced. This
 170               * ensures that multiple calls to rewind() do not alter state. */
 171              $this->onIteration(false);
 172          } catch (RuntimeException $e) {
 173              $this->resumeOrThrow($e);
 174          }
 175      }
 176  
 177      /**
 178       * @see http://php.net/iterator.valid
 179       * @return boolean
 180       */
 181      public function valid()
 182      {
 183          return $this->iterator->valid();
 184      }
 185  
 186      /**
 187       * Determines if an exception is a resumable error.
 188       *
 189       * @see https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.rst#resumable-error
 190       * @param RuntimeException $exception
 191       * @return boolean
 192       */
 193      private function isResumableError(RuntimeException $exception)
 194      {
 195          if ($exception instanceof ConnectionException) {
 196              return true;
 197          }
 198  
 199          if (! $exception instanceof ServerException) {
 200              return false;
 201          }
 202  
 203          if ($exception->getCode() === self::$cursorNotFound) {
 204              return true;
 205          }
 206  
 207          if (server_supports_feature($this->iterator->getServer(), self::$wireVersionForResumableChangeStreamError)) {
 208              return $exception->hasErrorLabel('ResumableChangeStreamError');
 209          }
 210  
 211          return in_array($exception->getCode(), self::$resumableErrorCodes);
 212      }
 213  
 214      /**
 215       * Perform housekeeping after an iteration event.
 216       *
 217       * @param boolean $incrementKey Increment $key if there is a current result
 218       * @throws ResumeTokenException
 219       */
 220      private function onIteration($incrementKey)
 221      {
 222          /* If the cursorId is 0, the server has invalidated the cursor and we
 223           * will never perform another getMore nor need to resume since any
 224           * remaining results (up to and including the invalidate event) will
 225           * have been received in the last response. Therefore, we can unset the
 226           * resumeCallable. This will free any reference to Watch as well as the
 227           * only reference to any implicit session created therein. */
 228          if ((string) $this->getCursorId() === '0') {
 229              $this->resumeCallable = null;
 230          }
 231  
 232          /* Return early if there is not a current result. Avoid any attempt to
 233           * increment the iterator's key. */
 234          if (! $this->valid()) {
 235              return;
 236          }
 237  
 238          if ($incrementKey) {
 239              $this->key++;
 240          }
 241  
 242          $this->hasAdvanced = true;
 243      }
 244  
 245      /**
 246       * Recreates the ChangeStreamIterator after a resumable server error.
 247       *
 248       * @return void
 249       */
 250      private function resume()
 251      {
 252          $this->iterator = call_user_func($this->resumeCallable, $this->getResumeToken(), $this->hasAdvanced);
 253          $this->iterator->rewind();
 254  
 255          $this->onIteration($this->hasAdvanced);
 256      }
 257  
 258      /**
 259       * Either resumes after a resumable error or re-throws the exception.
 260       *
 261       * @param RuntimeException $exception
 262       * @throws RuntimeException
 263       */
 264      private function resumeOrThrow(RuntimeException $exception)
 265      {
 266          if ($this->isResumableError($exception)) {
 267              $this->resume();
 268  
 269              return;
 270          }
 271  
 272          throw $exception;
 273      }
 274  }