See Release Notes
Long Term Support Release
Differences Between: [Versions 310 and 401] [Versions 311 and 401] [Versions 39 and 401] [Versions 400 and 401]
1 <?php 2 /* 3 * Copyright 2019-present MongoDB, Inc. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * https://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18 namespace MongoDB\Model; 19 20 use IteratorIterator; 21 use MongoDB\BSON\Serializable; 22 use MongoDB\Driver\Cursor; 23 use MongoDB\Driver\Monitoring\CommandFailedEvent; 24 use MongoDB\Driver\Monitoring\CommandStartedEvent; 25 use MongoDB\Driver\Monitoring\CommandSubscriber; 26 use MongoDB\Driver\Monitoring\CommandSucceededEvent; 27 use MongoDB\Driver\Server; 28 use MongoDB\Exception\InvalidArgumentException; 29 use MongoDB\Exception\ResumeTokenException; 30 use MongoDB\Exception\UnexpectedValueException; 31 use ReturnTypeWillChange; 32 33 use function assert; 34 use function count; 35 use function is_array; 36 use function is_object; 37 use function MongoDB\Driver\Monitoring\addSubscriber; 38 use function MongoDB\Driver\Monitoring\removeSubscriber; 39 40 /** 41 * ChangeStreamIterator wraps a change stream's tailable cursor. 42 * 43 * This iterator tracks the size of each batch in order to determine when the 44 * postBatchResumeToken is applicable. It also ensures that initial calls to 45 * rewind() do not execute getMore commands. 46 * 47 * @internal 48 */ 49 class ChangeStreamIterator extends IteratorIterator implements CommandSubscriber 50 { 51 /** @var integer */ 52 private $batchPosition = 0; 53 54 /** @var integer */ 55 private $batchSize; 56 57 /** @var boolean */ 58 private $isRewindNop; 59 60 /** @var boolean */ 61 private $isValid = false; 62 63 /** @var object|null */ 64 private $postBatchResumeToken; 65 66 /** @var array|object|null */ 67 private $resumeToken; 68 69 /** @var Server */ 70 private $server; 71 72 /** 73 * @internal 74 * @param array|object|null $initialResumeToken 75 */ 76 public function __construct(Cursor $cursor, int $firstBatchSize, $initialResumeToken, ?object $postBatchResumeToken) 77 { 78 if (isset($initialResumeToken) && ! is_array($initialResumeToken) && ! is_object($initialResumeToken)) { 79 throw InvalidArgumentException::invalidType('$initialResumeToken', $initialResumeToken, 'array or object'); 80 } 81 82 parent::__construct($cursor); 83 84 $this->batchSize = $firstBatchSize; 85 $this->isRewindNop = ($firstBatchSize === 0); 86 $this->postBatchResumeToken = $postBatchResumeToken; 87 $this->resumeToken = $initialResumeToken; 88 $this->server = $cursor->getServer(); 89 } 90 91 /** @internal */ 92 final public function commandFailed(CommandFailedEvent $event): void 93 { 94 } 95 96 /** @internal */ 97 final public function commandStarted(CommandStartedEvent $event): void 98 { 99 if ($event->getCommandName() !== 'getMore') { 100 return; 101 } 102 103 $this->batchPosition = 0; 104 $this->batchSize = 0; 105 $this->postBatchResumeToken = null; 106 } 107 108 /** @internal */ 109 final public function commandSucceeded(CommandSucceededEvent $event): void 110 { 111 if ($event->getCommandName() !== 'getMore') { 112 return; 113 } 114 115 $reply = $event->getReply(); 116 117 if (! isset($reply->cursor->nextBatch) || ! is_array($reply->cursor->nextBatch)) { 118 throw new UnexpectedValueException('getMore command did not return a "cursor.nextBatch" array'); 119 } 120 121 $this->batchSize = count($reply->cursor->nextBatch); 122 123 if (isset($reply->cursor->postBatchResumeToken) && is_object($reply->cursor->postBatchResumeToken)) { 124 $this->postBatchResumeToken = $reply->cursor->postBatchResumeToken; 125 } 126 } 127 128 /** 129 * @see https://php.net/iteratoriterator.current 130 * @return mixed 131 */ 132 #[ReturnTypeWillChange] 133 public function current() 134 { 135 return $this->isValid ? parent::current() : null; 136 } 137 138 /** 139 * Necessary to let psalm know that we're always expecting a cursor as inner 140 * iterator. This could be side-stepped due to the class not being final, 141 * but it's very much an invalid use-case. This method can be dropped in 2.0 142 * once the class is final. 143 */ 144 final public function getInnerIterator(): Cursor 145 { 146 $cursor = parent::getInnerIterator(); 147 assert($cursor instanceof Cursor); 148 149 return $cursor; 150 } 151 152 /** 153 * Returns the resume token for the iterator's current position. 154 * 155 * Null may be returned if no change documents have been iterated and the 156 * server did not include a postBatchResumeToken in its aggregate or getMore 157 * command response. 158 * 159 * @return array|object|null 160 */ 161 public function getResumeToken() 162 { 163 return $this->resumeToken; 164 } 165 166 /** 167 * Returns the server the cursor is running on. 168 */ 169 public function getServer(): Server 170 { 171 return $this->server; 172 } 173 174 /** 175 * @see https://php.net/iteratoriterator.key 176 * @return mixed 177 */ 178 #[ReturnTypeWillChange] 179 public function key() 180 { 181 return $this->isValid ? parent::key() : null; 182 } 183 184 /** 185 * @see https://php.net/iteratoriterator.rewind 186 */ 187 public function next(): void 188 { 189 /* Determine if advancing the iterator will execute a getMore command 190 * (i.e. we are already positioned at the end of the current batch). If 191 * so, rely on the APM callbacks to reset $batchPosition and update 192 * $batchSize. Otherwise, we can forgo APM and manually increment 193 * $batchPosition after calling next(). */ 194 $getMore = $this->isAtEndOfBatch(); 195 196 if ($getMore) { 197 addSubscriber($this); 198 } 199 200 try { 201 parent::next(); 202 $this->onIteration(! $getMore); 203 } finally { 204 if ($getMore) { 205 removeSubscriber($this); 206 } 207 } 208 } 209 210 /** 211 * @see https://php.net/iteratoriterator.rewind 212 */ 213 public function rewind(): void 214 { 215 if ($this->isRewindNop) { 216 return; 217 } 218 219 parent::rewind(); 220 $this->onIteration(false); 221 } 222 223 /** 224 * @see https://php.net/iteratoriterator.valid 225 */ 226 public function valid(): bool 227 { 228 return $this->isValid; 229 } 230 231 /** 232 * Extracts the resume token (i.e. "_id" field) from a change document. 233 * 234 * @param array|object $document Change document 235 * @return array|object 236 * @throws InvalidArgumentException 237 * @throws ResumeTokenException if the resume token is not found or invalid 238 */ 239 private function extractResumeToken($document) 240 { 241 if (! is_array($document) && ! is_object($document)) { 242 throw InvalidArgumentException::invalidType('$document', $document, 'array or object'); 243 } 244 245 if ($document instanceof Serializable) { 246 return $this->extractResumeToken($document->bsonSerialize()); 247 } 248 249 $resumeToken = is_array($document) 250 ? ($document['_id'] ?? null) 251 : ($document->_id ?? null); 252 253 if (! isset($resumeToken)) { 254 $this->isValid = false; 255 256 throw ResumeTokenException::notFound(); 257 } 258 259 if (! is_array($resumeToken) && ! is_object($resumeToken)) { 260 $this->isValid = false; 261 262 throw ResumeTokenException::invalidType($resumeToken); 263 } 264 265 return $resumeToken; 266 } 267 268 /** 269 * Return whether the iterator is positioned at the end of the batch. 270 */ 271 private function isAtEndOfBatch(): bool 272 { 273 return $this->batchPosition + 1 >= $this->batchSize; 274 } 275 276 /** 277 * Perform housekeeping after an iteration event. 278 * 279 * @see https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.rst#updating-the-cached-resume-token 280 */ 281 private function onIteration(bool $incrementBatchPosition): void 282 { 283 $this->isValid = parent::valid(); 284 285 /* Disable rewind()'s NOP behavior once we advance to a valid position. 286 * This will allow the driver to throw a LogicException if rewind() is 287 * called after the cursor has advanced past its first element. */ 288 if ($this->isRewindNop && $this->isValid) { 289 $this->isRewindNop = false; 290 } 291 292 if ($incrementBatchPosition && $this->isValid) { 293 $this->batchPosition++; 294 } 295 296 /* If the iterator is positioned at the end of the batch, apply the 297 * postBatchResumeToken if it's available. This handles both the case 298 * where the current batch is empty (since onIteration() will be called 299 * after a successful getMore) and when the iterator has advanced to the 300 * last document in its current batch. Otherwise, extract a resume token 301 * from the current document if possible. */ 302 if ($this->isAtEndOfBatch() && $this->postBatchResumeToken !== null) { 303 $this->resumeToken = $this->postBatchResumeToken; 304 } elseif ($this->isValid) { 305 $this->resumeToken = $this->extractResumeToken($this->current()); 306 } 307 } 308 }
title
Description
Body
title
Description
Body
title
Description
Body
title
Body