Differences Between: [Versions 310 and 311] [Versions 311 and 401] [Versions 39 and 311]
1 <?php 2 /* 3 * Copyright 2019 MongoDB, Inc. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18 namespace MongoDB\Model; 19 20 use IteratorIterator; 21 use MongoDB\BSON\Serializable; 22 use MongoDB\Driver\Cursor; 23 use MongoDB\Driver\Monitoring\CommandFailedEvent; 24 use MongoDB\Driver\Monitoring\CommandStartedEvent; 25 use MongoDB\Driver\Monitoring\CommandSubscriber; 26 use MongoDB\Driver\Monitoring\CommandSucceededEvent; 27 use MongoDB\Driver\Server; 28 use MongoDB\Exception\InvalidArgumentException; 29 use MongoDB\Exception\ResumeTokenException; 30 use MongoDB\Exception\UnexpectedValueException; 31 use function count; 32 use function is_array; 33 use function is_integer; 34 use function is_object; 35 use function MongoDB\Driver\Monitoring\addSubscriber; 36 use function MongoDB\Driver\Monitoring\removeSubscriber; 37 38 /** 39 * ChangeStreamIterator wraps a change stream's tailable cursor. 40 * 41 * This iterator tracks the size of each batch in order to determine when the 42 * postBatchResumeToken is applicable. It also ensures that initial calls to 43 * rewind() do not execute getMore commands. 44 * 45 * @internal 46 */ 47 class ChangeStreamIterator extends IteratorIterator implements CommandSubscriber 48 { 49 /** @var integer */ 50 private $batchPosition = 0; 51 52 /** @var integer */ 53 private $batchSize; 54 55 /** @var boolean */ 56 private $isRewindNop; 57 58 /** @var boolean */ 59 private $isValid = false; 60 61 /** @var object|null */ 62 private $postBatchResumeToken; 63 64 /** @var array|object|null */ 65 private $resumeToken; 66 67 /** @var Server */ 68 private $server; 69 70 /** 71 * @internal 72 * @param Cursor $cursor 73 * @param integer $firstBatchSize 74 * @param array|object|null $initialResumeToken 75 * @param object|null $postBatchResumeToken 76 */ 77 public function __construct(Cursor $cursor, $firstBatchSize, $initialResumeToken, $postBatchResumeToken) 78 { 79 if (! is_integer($firstBatchSize)) { 80 throw InvalidArgumentException::invalidType('$firstBatchSize', $firstBatchSize, 'integer'); 81 } 82 83 if (isset($initialResumeToken) && ! is_array($initialResumeToken) && ! is_object($initialResumeToken)) { 84 throw InvalidArgumentException::invalidType('$initialResumeToken', $initialResumeToken, 'array or object'); 85 } 86 87 if (isset($postBatchResumeToken) && ! is_object($postBatchResumeToken)) { 88 throw InvalidArgumentException::invalidType('$postBatchResumeToken', $postBatchResumeToken, 'object'); 89 } 90 91 parent::__construct($cursor); 92 93 $this->batchSize = $firstBatchSize; 94 $this->isRewindNop = ($firstBatchSize === 0); 95 $this->postBatchResumeToken = $postBatchResumeToken; 96 $this->resumeToken = $initialResumeToken; 97 $this->server = $cursor->getServer(); 98 } 99 100 /** @internal */ 101 final public function commandFailed(CommandFailedEvent $event) 102 { 103 } 104 105 /** @internal */ 106 final public function commandStarted(CommandStartedEvent $event) 107 { 108 if ($event->getCommandName() !== 'getMore') { 109 return; 110 } 111 112 $this->batchPosition = 0; 113 $this->batchSize = null; 114 $this->postBatchResumeToken = null; 115 } 116 117 /** @internal */ 118 final public function commandSucceeded(CommandSucceededEvent $event) 119 { 120 if ($event->getCommandName() !== 'getMore') { 121 return; 122 } 123 124 $reply = $event->getReply(); 125 126 if (! isset($reply->cursor->nextBatch) || ! is_array($reply->cursor->nextBatch)) { 127 throw new UnexpectedValueException('getMore command did not return a "cursor.nextBatch" array'); 128 } 129 130 $this->batchSize = count($reply->cursor->nextBatch); 131 132 if (isset($reply->cursor->postBatchResumeToken) && is_object($reply->cursor->postBatchResumeToken)) { 133 $this->postBatchResumeToken = $reply->cursor->postBatchResumeToken; 134 } 135 } 136 137 /** 138 * @see https://php.net/iteratoriterator.current 139 * @return mixed 140 */ 141 public function current() 142 { 143 return $this->isValid ? parent::current() : null; 144 } 145 146 /** 147 * Returns the resume token for the iterator's current position. 148 * 149 * Null may be returned if no change documents have been iterated and the 150 * server did not include a postBatchResumeToken in its aggregate or getMore 151 * command response. 152 * 153 * @return array|object|null 154 */ 155 public function getResumeToken() 156 { 157 return $this->resumeToken; 158 } 159 160 /** 161 * Returns the server the cursor is running on. 162 */ 163 public function getServer() : Server 164 { 165 return $this->server; 166 } 167 168 /** 169 * @see https://php.net/iteratoriterator.key 170 * @return mixed 171 */ 172 public function key() 173 { 174 return $this->isValid ? parent::key() : null; 175 } 176 177 /** 178 * @see https://php.net/iteratoriterator.rewind 179 * @return void 180 */ 181 public function next() 182 { 183 /* Determine if advancing the iterator will execute a getMore command 184 * (i.e. we are already positioned at the end of the current batch). If 185 * so, rely on the APM callbacks to reset $batchPosition and update 186 * $batchSize. Otherwise, we can forgo APM and manually increment 187 * $batchPosition after calling next(). */ 188 $getMore = $this->isAtEndOfBatch(); 189 190 if ($getMore) { 191 addSubscriber($this); 192 } 193 194 try { 195 parent::next(); 196 $this->onIteration(! $getMore); 197 } finally { 198 if ($getMore) { 199 removeSubscriber($this); 200 } 201 } 202 } 203 204 /** 205 * @see https://php.net/iteratoriterator.rewind 206 * @return void 207 */ 208 public function rewind() 209 { 210 if ($this->isRewindNop) { 211 return; 212 } 213 214 parent::rewind(); 215 $this->onIteration(false); 216 } 217 218 /** 219 * @see https://php.net/iteratoriterator.valid 220 * @return boolean 221 */ 222 public function valid() 223 { 224 return $this->isValid; 225 } 226 227 /** 228 * Extracts the resume token (i.e. "_id" field) from a change document. 229 * 230 * @param array|object $document Change document 231 * @return array|object 232 * @throws InvalidArgumentException 233 * @throws ResumeTokenException if the resume token is not found or invalid 234 */ 235 private function extractResumeToken($document) 236 { 237 if (! is_array($document) && ! is_object($document)) { 238 throw InvalidArgumentException::invalidType('$document', $document, 'array or object'); 239 } 240 241 if ($document instanceof Serializable) { 242 return $this->extractResumeToken($document->bsonSerialize()); 243 } 244 245 $resumeToken = is_array($document) 246 ? ($document['_id'] ?? null) 247 : ($document->_id ?? null); 248 249 if (! isset($resumeToken)) { 250 $this->isValid = false; 251 throw ResumeTokenException::notFound(); 252 } 253 254 if (! is_array($resumeToken) && ! is_object($resumeToken)) { 255 $this->isValid = false; 256 throw ResumeTokenException::invalidType($resumeToken); 257 } 258 259 return $resumeToken; 260 } 261 262 /** 263 * Return whether the iterator is positioned at the end of the batch. 264 * 265 * @return boolean 266 */ 267 private function isAtEndOfBatch() 268 { 269 return $this->batchPosition + 1 >= $this->batchSize; 270 } 271 272 /** 273 * Perform housekeeping after an iteration event. 274 * 275 * @see https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.rst#updating-the-cached-resume-token 276 * @param boolean $incrementBatchPosition 277 */ 278 private function onIteration($incrementBatchPosition) 279 { 280 $this->isValid = parent::valid(); 281 282 /* Disable rewind()'s NOP behavior once we advance to a valid position. 283 * This will allow the driver to throw a LogicException if rewind() is 284 * called after the cursor has advanced past its first element. */ 285 if ($this->isRewindNop && $this->isValid) { 286 $this->isRewindNop = false; 287 } 288 289 if ($incrementBatchPosition && $this->isValid) { 290 $this->batchPosition++; 291 } 292 293 /* If the iterator is positioned at the end of the batch, apply the 294 * postBatchResumeToken if it's available. This handles both the case 295 * where the current batch is empty (since onIteration() will be called 296 * after a successful getMore) and when the iterator has advanced to the 297 * last document in its current batch. Otherwise, extract a resume token 298 * from the current document if possible. */ 299 if ($this->isAtEndOfBatch() && $this->postBatchResumeToken !== null) { 300 $this->resumeToken = $this->postBatchResumeToken; 301 } elseif ($this->isValid) { 302 $this->resumeToken = $this->extractResumeToken($this->current()); 303 } 304 } 305 }
title
Description
Body
title
Description
Body
title
Description
Body
title
Body