See Release Notes
Long Term Support Release
<?php /*< * Copyright 2019 MongoDB, Inc.> * Copyright 2019-present MongoDB, Inc.* * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at *< * http://www.apache.org/licenses/LICENSE-2.0> * https://www.apache.org/licenses/LICENSE-2.0* * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ namespace MongoDB\Model; use IteratorIterator; use MongoDB\BSON\Serializable; use MongoDB\Driver\Cursor; use MongoDB\Driver\Monitoring\CommandFailedEvent; use MongoDB\Driver\Monitoring\CommandStartedEvent; use MongoDB\Driver\Monitoring\CommandSubscriber; use MongoDB\Driver\Monitoring\CommandSucceededEvent;> use MongoDB\Driver\Server;use MongoDB\Exception\InvalidArgumentException; use MongoDB\Exception\ResumeTokenException; use MongoDB\Exception\UnexpectedValueException;> use ReturnTypeWillChange; use function count; > use function is_array; > use function assert;< use function is_integer;use function is_object; use function MongoDB\Driver\Monitoring\addSubscriber; use function MongoDB\Driver\Monitoring\removeSubscriber; /** * ChangeStreamIterator wraps a change stream's tailable cursor. * * This iterator tracks the size of each batch in order to determine when the * postBatchResumeToken is applicable. It also ensures that initial calls to * rewind() do not execute getMore commands. * * @internal */ class ChangeStreamIterator extends IteratorIterator implements CommandSubscriber { /** @var integer */ private $batchPosition = 0; /** @var integer */ private $batchSize; /** @var boolean */ private $isRewindNop; /** @var boolean */ private $isValid = false; /** @var object|null */ private $postBatchResumeToken; /** @var array|object|null */ private $resumeToken;> /** @var Server */ /** > private $server; * @internal >< * @param Cursor $cursor < * @param integer $firstBatchSize* @param array|object|null $initialResumeToken< * @param object|null $postBatchResumeToken*/< public function __construct(Cursor $cursor, $firstBatchSize, $initialResumeToken, $postBatchResumeToken)> public function __construct(Cursor $cursor, int $firstBatchSize, $initialResumeToken, ?object $postBatchResumeToken){< if (! is_integer($firstBatchSize)) { < throw InvalidArgumentException::invalidType('$firstBatchSize', $firstBatchSize, 'integer'); < } <if (isset($initialResumeToken) && ! is_array($initialResumeToken) && ! is_object($initialResumeToken)) { throw InvalidArgumentException::invalidType('$initialResumeToken', $initialResumeToken, 'array or object'); }< if (isset($postBatchResumeToken) && ! is_object($postBatchResumeToken)) { < throw InvalidArgumentException::invalidType('$postBatchResumeToken', $postBatchResumeToken, 'object'); < } <parent::__construct($cursor); $this->batchSize = $firstBatchSize; $this->isRewindNop = ($firstBatchSize === 0); $this->postBatchResumeToken = $postBatchResumeToken; $this->resumeToken = $initialResumeToken;> $this->server = $cursor->getServer();} /** @internal */< final public function commandFailed(CommandFailedEvent $event)> final public function commandFailed(CommandFailedEvent $event): void{ } /** @internal */< final public function commandStarted(CommandStartedEvent $event)> final public function commandStarted(CommandStartedEvent $event): void{ if ($event->getCommandName() !== 'getMore') { return; } $this->batchPosition = 0;< $this->batchSize = null;> $this->batchSize = 0;$this->postBatchResumeToken = null; } /** @internal */< final public function commandSucceeded(CommandSucceededEvent $event)> final public function commandSucceeded(CommandSucceededEvent $event): void{ if ($event->getCommandName() !== 'getMore') { return; } $reply = $event->getReply(); if (! isset($reply->cursor->nextBatch) || ! is_array($reply->cursor->nextBatch)) { throw new UnexpectedValueException('getMore command did not return a "cursor.nextBatch" array'); } $this->batchSize = count($reply->cursor->nextBatch); if (isset($reply->cursor->postBatchResumeToken) && is_object($reply->cursor->postBatchResumeToken)) { $this->postBatchResumeToken = $reply->cursor->postBatchResumeToken; } } /** * @see https://php.net/iteratoriterator.current * @return mixed */> #[ReturnTypeWillChange]public function current() { return $this->isValid ? parent::current() : null; } /**> * Necessary to let psalm know that we're always expecting a cursor as inner * Returns the resume token for the iterator's current position. > * iterator. This could be side-stepped due to the class not being final, * > * but it's very much an invalid use-case. This method can be dropped in 2.0 * Null may be returned if no change documents have been iterated and the > * once the class is final. * server did not include a postBatchResumeToken in its aggregate or getMore > */ * command response. > final public function getInnerIterator(): Cursor * > { * @return array|object|null > $cursor = parent::getInnerIterator(); */ > assert($cursor instanceof Cursor); public function getResumeToken() > { > return $cursor; return $this->resumeToken; > } } > > /**/**> * Returns the server the cursor is running on. * @see https://php.net/iteratoriterator.key > */ * @return mixed > public function getServer(): Server */ > { public function key() > return $this->server; { > } return $this->isValid ? parent::key() : null; > } > /**> #[ReturnTypeWillChange]/** * @see https://php.net/iteratoriterator.rewind< * @return void*/< public function next()> public function next(): void{ /* Determine if advancing the iterator will execute a getMore command * (i.e. we are already positioned at the end of the current batch). If * so, rely on the APM callbacks to reset $batchPosition and update * $batchSize. Otherwise, we can forgo APM and manually increment * $batchPosition after calling next(). */ $getMore = $this->isAtEndOfBatch(); if ($getMore) { addSubscriber($this); } try { parent::next(); $this->onIteration(! $getMore); } finally { if ($getMore) { removeSubscriber($this); } } } /** * @see https://php.net/iteratoriterator.rewind< * @return void*/< public function rewind()> public function rewind(): void{ if ($this->isRewindNop) { return; } parent::rewind(); $this->onIteration(false); } /** * @see https://php.net/iteratoriterator.valid< * @return boolean*/< public function valid()> public function valid(): bool{ return $this->isValid; } /** * Extracts the resume token (i.e. "_id" field) from a change document. * * @param array|object $document Change document * @return array|object * @throws InvalidArgumentException * @throws ResumeTokenException if the resume token is not found or invalid */ private function extractResumeToken($document) { if (! is_array($document) && ! is_object($document)) { throw InvalidArgumentException::invalidType('$document', $document, 'array or object'); } if ($document instanceof Serializable) { return $this->extractResumeToken($document->bsonSerialize()); } $resumeToken = is_array($document)< ? (isset($document['_id']) ? $document['_id'] : null) < : (isset($document->_id) ? $document->_id : null);> ? ($document['_id'] ?? null) > : ($document->_id ?? null);if (! isset($resumeToken)) { $this->isValid = false;>throw ResumeTokenException::notFound(); } if (! is_array($resumeToken) && ! is_object($resumeToken)) { $this->isValid = false;>throw ResumeTokenException::invalidType($resumeToken); } return $resumeToken; } /** * Return whether the iterator is positioned at the end of the batch.< * < * @return boolean*/< private function isAtEndOfBatch()> private function isAtEndOfBatch(): bool{ return $this->batchPosition + 1 >= $this->batchSize; } /** * Perform housekeeping after an iteration event. * * @see https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.rst#updating-the-cached-resume-token< * @param boolean $incrementBatchPosition*/< private function onIteration($incrementBatchPosition)> private function onIteration(bool $incrementBatchPosition): void{ $this->isValid = parent::valid(); /* Disable rewind()'s NOP behavior once we advance to a valid position. * This will allow the driver to throw a LogicException if rewind() is * called after the cursor has advanced past its first element. */ if ($this->isRewindNop && $this->isValid) { $this->isRewindNop = false; } if ($incrementBatchPosition && $this->isValid) { $this->batchPosition++; } /* If the iterator is positioned at the end of the batch, apply the * postBatchResumeToken if it's available. This handles both the case * where the current batch is empty (since onIteration() will be called * after a successful getMore) and when the iterator has advanced to the * last document in its current batch. Otherwise, extract a resume token * from the current document if possible. */ if ($this->isAtEndOfBatch() && $this->postBatchResumeToken !== null) { $this->resumeToken = $this->postBatchResumeToken; } elseif ($this->isValid) { $this->resumeToken = $this->extractResumeToken($this->current()); } } }