Search moodle.org's
Developer Documentation

See Release Notes

  • Bug fixes for general core bugs in 3.10.x will end 8 November 2021 (12 months).
  • Bug fixes for security issues in 3.10.x will end 9 May 2022 (18 months).
  • PHP version: minimum PHP 7.2.0 Note: minimum PHP version has increased since Moodle 3.8. PHP 7.3.x and 7.4.x are supported too.

Differences Between: [Versions 310 and 311] [Versions 310 and 400] [Versions 310 and 401]

   1  <?php
   2  /*
   3   * Copyright 2017 MongoDB, Inc.
   4   *
   5   * Licensed under the Apache License, Version 2.0 (the "License");
   6   * you may not use this file except in compliance with the License.
   7   * You may obtain a copy of the License at
   8   *
   9   *   http://www.apache.org/licenses/LICENSE-2.0
  10   *
  11   * Unless required by applicable law or agreed to in writing, software
  12   * distributed under the License is distributed on an "AS IS" BASIS,
  13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14   * See the License for the specific language governing permissions and
  15   * limitations under the License.
  16   */
  17  
  18  namespace MongoDB;
  19  
  20  use Iterator;
  21  use MongoDB\Driver\CursorId;
  22  use MongoDB\Driver\Exception\ConnectionException;
  23  use MongoDB\Driver\Exception\RuntimeException;
  24  use MongoDB\Driver\Exception\ServerException;
  25  use MongoDB\Exception\ResumeTokenException;
  26  use MongoDB\Model\ChangeStreamIterator;
  27  use function call_user_func;
  28  use function in_array;
  29  
  30  /**
  31   * Iterator for a change stream.
  32   *
  33   * @api
  34   * @see \MongoDB\Collection::watch()
  35   * @see http://docs.mongodb.org/manual/reference/command/changeStream/
  36   */
  37  class ChangeStream implements Iterator
  38  {
  39      /**
  40       * @deprecated 1.4
  41       * @todo Remove this in 2.0 (see: PHPLIB-360)
  42       */
  43      const CURSOR_NOT_FOUND = 43;
  44  
  45      /** @var array */
  46      private static $nonResumableErrorCodes = [
  47          136, // CappedPositionLost
  48          237, // CursorKilled
  49          11601, // Interrupted
  50      ];
  51  
  52      /** @var callable */
  53      private $resumeCallable;
  54  
  55      /** @var ChangeStreamIterator */
  56      private $iterator;
  57  
  58      /** @var integer */
  59      private $key = 0;
  60  
  61      /**
  62       * Whether the change stream has advanced to its first result. This is used
  63       * to determine whether $key should be incremented after an iteration event.
  64       *
  65       * @var boolean
  66       */
  67      private $hasAdvanced = false;
  68  
  69      /**
  70       * @internal
  71       * @param ChangeStreamIterator $iterator
  72       * @param callable             $resumeCallable
  73       */
  74      public function __construct(ChangeStreamIterator $iterator, callable $resumeCallable)
  75      {
  76          $this->iterator = $iterator;
  77          $this->resumeCallable = $resumeCallable;
  78      }
  79  
  80      /**
  81       * @see http://php.net/iterator.current
  82       * @return mixed
  83       */
  84      public function current()
  85      {
  86          return $this->iterator->current();
  87      }
  88  
  89      /**
  90       * @return CursorId
  91       */
  92      public function getCursorId()
  93      {
  94          return $this->iterator->getInnerIterator()->getId();
  95      }
  96  
  97      /**
  98       * Returns the resume token for the iterator's current position.
  99       *
 100       * Null may be returned if no change documents have been iterated and the
 101       * server did not include a postBatchResumeToken in its aggregate or getMore
 102       * command response.
 103       *
 104       * @return array|object|null
 105       */
 106      public function getResumeToken()
 107      {
 108          return $this->iterator->getResumeToken();
 109      }
 110  
 111      /**
 112       * @see http://php.net/iterator.key
 113       * @return mixed
 114       */
 115      public function key()
 116      {
 117          if ($this->valid()) {
 118              return $this->key;
 119          }
 120  
 121          return null;
 122      }
 123  
 124      /**
 125       * @see http://php.net/iterator.next
 126       * @return void
 127       * @throws ResumeTokenException
 128       */
 129      public function next()
 130      {
 131          try {
 132              $this->iterator->next();
 133              $this->onIteration($this->hasAdvanced);
 134          } catch (RuntimeException $e) {
 135              $this->resumeOrThrow($e);
 136          }
 137      }
 138  
 139      /**
 140       * @see http://php.net/iterator.rewind
 141       * @return void
 142       * @throws ResumeTokenException
 143       */
 144      public function rewind()
 145      {
 146          try {
 147              $this->iterator->rewind();
 148              /* Unlike next() and resume(), the decision to increment the key
 149               * does not depend on whether the change stream has advanced. This
 150               * ensures that multiple calls to rewind() do not alter state. */
 151              $this->onIteration(false);
 152          } catch (RuntimeException $e) {
 153              $this->resumeOrThrow($e);
 154          }
 155      }
 156  
 157      /**
 158       * @see http://php.net/iterator.valid
 159       * @return boolean
 160       */
 161      public function valid()
 162      {
 163          return $this->iterator->valid();
 164      }
 165  
 166      /**
 167       * Determines if an exception is a resumable error.
 168       *
 169       * @see https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.rst#resumable-error
 170       * @param RuntimeException $exception
 171       * @return boolean
 172       */
 173      private function isResumableError(RuntimeException $exception)
 174      {
 175          if ($exception instanceof ConnectionException) {
 176              return true;
 177          }
 178  
 179          if (! $exception instanceof ServerException) {
 180              return false;
 181          }
 182  
 183          if ($exception->hasErrorLabel('NonResumableChangeStreamError')) {
 184              return false;
 185          }
 186  
 187          if (in_array($exception->getCode(), self::$nonResumableErrorCodes)) {
 188              return false;
 189          }
 190  
 191          return true;
 192      }
 193  
 194      /**
 195       * Perform housekeeping after an iteration event.
 196       *
 197       * @param boolean $incrementKey Increment $key if there is a current result
 198       * @throws ResumeTokenException
 199       */
 200      private function onIteration($incrementKey)
 201      {
 202          /* If the cursorId is 0, the server has invalidated the cursor and we
 203           * will never perform another getMore nor need to resume since any
 204           * remaining results (up to and including the invalidate event) will
 205           * have been received in the last response. Therefore, we can unset the
 206           * resumeCallable. This will free any reference to Watch as well as the
 207           * only reference to any implicit session created therein. */
 208          if ((string) $this->getCursorId() === '0') {
 209              $this->resumeCallable = null;
 210          }
 211  
 212          /* Return early if there is not a current result. Avoid any attempt to
 213           * increment the iterator's key. */
 214          if (! $this->valid()) {
 215              return;
 216          }
 217  
 218          if ($incrementKey) {
 219              $this->key++;
 220          }
 221  
 222          $this->hasAdvanced = true;
 223      }
 224  
 225      /**
 226       * Recreates the ChangeStreamIterator after a resumable server error.
 227       *
 228       * @return void
 229       */
 230      private function resume()
 231      {
 232          $this->iterator = call_user_func($this->resumeCallable, $this->getResumeToken(), $this->hasAdvanced);
 233          $this->iterator->rewind();
 234  
 235          $this->onIteration($this->hasAdvanced);
 236      }
 237  
 238      /**
 239       * Either resumes after a resumable error or re-throws the exception.
 240       *
 241       * @param RuntimeException $exception
 242       * @throws RuntimeException
 243       */
 244      private function resumeOrThrow(RuntimeException $exception)
 245      {
 246          if ($this->isResumableError($exception)) {
 247              $this->resume();
 248  
 249              return;
 250          }
 251  
 252          throw $exception;
 253      }
 254  }