Search moodle.org's
Developer Documentation

See Release Notes

  • Bug fixes for general core bugs in 3.10.x will end 8 November 2021 (12 months).
  • Bug fixes for security issues in 3.10.x will end 9 May 2022 (18 months).
  • PHP version: minimum PHP 7.2.0 Note: minimum PHP version has increased since Moodle 3.8. PHP 7.3.x and 7.4.x are supported too.
<?php
/*
< * Copyright 2017 MongoDB, Inc.
> * Copyright 2017-present MongoDB, Inc.
* * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at *
< * http://www.apache.org/licenses/LICENSE-2.0
> * https://www.apache.org/licenses/LICENSE-2.0
* * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ namespace MongoDB; use Iterator; use MongoDB\Driver\CursorId; use MongoDB\Driver\Exception\ConnectionException; use MongoDB\Driver\Exception\RuntimeException; use MongoDB\Driver\Exception\ServerException;
> use MongoDB\Exception\BadMethodCallException;
use MongoDB\Exception\ResumeTokenException; use MongoDB\Model\ChangeStreamIterator;
> use ReturnTypeWillChange; use function call_user_func; >
use function in_array; /** * Iterator for a change stream. *
> * @psalm-type ResumeCallable = callable(array|object|null, bool): ChangeStreamIterator * @api > *
* @see \MongoDB\Collection::watch()
< * @see http://docs.mongodb.org/manual/reference/command/changeStream/
> * @see https://mongodb.com/docs/manual/reference/method/db.watch/#mongodb-method-db.watch
*/ class ChangeStream implements Iterator { /** * @deprecated 1.4 * @todo Remove this in 2.0 (see: PHPLIB-360) */
< const CURSOR_NOT_FOUND = 43;
> public const CURSOR_NOT_FOUND = 43;
< /** @var array */ < private static $nonResumableErrorCodes = [ < 136, // CappedPositionLost < 237, // CursorKilled < 11601, // Interrupted
> /** @var int */ > private static $cursorNotFound = 43; > > /** @var int[] */ > private static $resumableErrorCodes = [ > 6, // HostUnreachable > 7, // HostNotFound > 89, // NetworkTimeout > 91, // ShutdownInProgress > 189, // PrimarySteppedDown > 262, // ExceededTimeLimit > 9001, // SocketException > 10107, // NotPrimary > 11600, // InterruptedAtShutdown > 11602, // InterruptedDueToReplStateChange > 13435, // NotPrimaryNoSecondaryOk > 13436, // NotPrimaryOrSecondary > 63, // StaleShardVersion > 150, // StaleEpoch > 13388, // StaleConfig > 234, // RetryChangeStream > 133, // FailedToSatisfyReadPreference
];
< /** @var callable */
> /** @var int */ > private static $wireVersionForResumableChangeStreamError = 9; > > /** @var ResumeCallable|null */
private $resumeCallable; /** @var ChangeStreamIterator */ private $iterator; /** @var integer */ private $key = 0; /** * Whether the change stream has advanced to its first result. This is used * to determine whether $key should be incremented after an iteration event. * * @var boolean */ private $hasAdvanced = false; /** * @internal
< * @param ChangeStreamIterator $iterator < * @param callable $resumeCallable
> * > * @param ResumeCallable $resumeCallable
*/ public function __construct(ChangeStreamIterator $iterator, callable $resumeCallable) { $this->iterator = $iterator; $this->resumeCallable = $resumeCallable; } /**
< * @see http://php.net/iterator.current
> * @see https://php.net/iterator.current
* @return mixed */
> #[ReturnTypeWillChange]
public function current() { return $this->iterator->current(); } /** * @return CursorId */ public function getCursorId() { return $this->iterator->getInnerIterator()->getId(); } /** * Returns the resume token for the iterator's current position. * * Null may be returned if no change documents have been iterated and the * server did not include a postBatchResumeToken in its aggregate or getMore * command response. * * @return array|object|null */ public function getResumeToken() { return $this->iterator->getResumeToken(); } /**
< * @see http://php.net/iterator.key
> * @see https://php.net/iterator.key
* @return mixed */
> #[ReturnTypeWillChange]
public function key() { if ($this->valid()) { return $this->key; } return null; } /**
< * @see http://php.net/iterator.next
> * @see https://php.net/iterator.next
* @return void * @throws ResumeTokenException */
> #[ReturnTypeWillChange]
public function next() { try { $this->iterator->next(); $this->onIteration($this->hasAdvanced); } catch (RuntimeException $e) { $this->resumeOrThrow($e); } } /**
< * @see http://php.net/iterator.rewind
> * @see https://php.net/iterator.rewind
* @return void * @throws ResumeTokenException */
> #[ReturnTypeWillChange]
public function rewind() { try { $this->iterator->rewind(); /* Unlike next() and resume(), the decision to increment the key * does not depend on whether the change stream has advanced. This * ensures that multiple calls to rewind() do not alter state. */ $this->onIteration(false); } catch (RuntimeException $e) { $this->resumeOrThrow($e); } } /**
< * @see http://php.net/iterator.valid
> * @see https://php.net/iterator.valid
* @return boolean */
> #[ReturnTypeWillChange]
public function valid() { return $this->iterator->valid(); } /** * Determines if an exception is a resumable error. * * @see https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.rst#resumable-error
< * @param RuntimeException $exception < * @return boolean
*/
< private function isResumableError(RuntimeException $exception)
> private function isResumableError(RuntimeException $exception): bool
{ if ($exception instanceof ConnectionException) { return true; } if (! $exception instanceof ServerException) { return false; }
< if ($exception->hasErrorLabel('NonResumableChangeStreamError')) { < return false;
> if ($exception->getCode() === self::$cursorNotFound) { > return true;
}
< if (in_array($exception->getCode(), self::$nonResumableErrorCodes)) { < return false;
> if (server_supports_feature($this->iterator->getServer(), self::$wireVersionForResumableChangeStreamError)) { > return $exception->hasErrorLabel('ResumableChangeStreamError');
}
< return true;
> return in_array($exception->getCode(), self::$resumableErrorCodes);
} /** * Perform housekeeping after an iteration event. * * @param boolean $incrementKey Increment $key if there is a current result * @throws ResumeTokenException */
< private function onIteration($incrementKey)
> private function onIteration(bool $incrementKey): void
{ /* If the cursorId is 0, the server has invalidated the cursor and we * will never perform another getMore nor need to resume since any * remaining results (up to and including the invalidate event) will * have been received in the last response. Therefore, we can unset the * resumeCallable. This will free any reference to Watch as well as the * only reference to any implicit session created therein. */ if ((string) $this->getCursorId() === '0') { $this->resumeCallable = null; } /* Return early if there is not a current result. Avoid any attempt to * increment the iterator's key. */ if (! $this->valid()) { return; } if ($incrementKey) { $this->key++; } $this->hasAdvanced = true; } /** * Recreates the ChangeStreamIterator after a resumable server error.
< * < * @return void
*/
< private function resume()
> private function resume(): void
{
> if (! $this->resumeCallable) { $this->iterator = call_user_func($this->resumeCallable, $this->getResumeToken(), $this->hasAdvanced); > throw new BadMethodCallException('Cannot resume a closed change stream.'); $this->iterator->rewind(); > } >
$this->onIteration($this->hasAdvanced);
>
} /** * Either resumes after a resumable error or re-throws the exception. *
< * @param RuntimeException $exception
* @throws RuntimeException */
< private function resumeOrThrow(RuntimeException $exception)
> private function resumeOrThrow(RuntimeException $exception): void
{ if ($this->isResumableError($exception)) { $this->resume(); return; } throw $exception; } }