See Release Notes
Long Term Support Release
Differences Between: [Versions 39 and 311] [Versions 39 and 400] [Versions 39 and 401]
1 <?php 2 /* 3 * Copyright 2019 MongoDB, Inc. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18 namespace MongoDB\Model; 19 20 use IteratorIterator; 21 use MongoDB\BSON\Serializable; 22 use MongoDB\Driver\Cursor; 23 use MongoDB\Driver\Monitoring\CommandFailedEvent; 24 use MongoDB\Driver\Monitoring\CommandStartedEvent; 25 use MongoDB\Driver\Monitoring\CommandSubscriber; 26 use MongoDB\Driver\Monitoring\CommandSucceededEvent; 27 use MongoDB\Exception\InvalidArgumentException; 28 use MongoDB\Exception\ResumeTokenException; 29 use MongoDB\Exception\UnexpectedValueException; 30 use function count; 31 use function is_array; 32 use function is_integer; 33 use function is_object; 34 use function MongoDB\Driver\Monitoring\addSubscriber; 35 use function MongoDB\Driver\Monitoring\removeSubscriber; 36 37 /** 38 * ChangeStreamIterator wraps a change stream's tailable cursor. 39 * 40 * This iterator tracks the size of each batch in order to determine when the 41 * postBatchResumeToken is applicable. It also ensures that initial calls to 42 * rewind() do not execute getMore commands. 43 * 44 * @internal 45 */ 46 class ChangeStreamIterator extends IteratorIterator implements CommandSubscriber 47 { 48 /** @var integer */ 49 private $batchPosition = 0; 50 51 /** @var integer */ 52 private $batchSize; 53 54 /** @var boolean */ 55 private $isRewindNop; 56 57 /** @var boolean */ 58 private $isValid = false; 59 60 /** @var object|null */ 61 private $postBatchResumeToken; 62 63 /** @var array|object|null */ 64 private $resumeToken; 65 66 /** 67 * @internal 68 * @param Cursor $cursor 69 * @param integer $firstBatchSize 70 * @param array|object|null $initialResumeToken 71 * @param object|null $postBatchResumeToken 72 */ 73 public function __construct(Cursor $cursor, $firstBatchSize, $initialResumeToken, $postBatchResumeToken) 74 { 75 if (! is_integer($firstBatchSize)) { 76 throw InvalidArgumentException::invalidType('$firstBatchSize', $firstBatchSize, 'integer'); 77 } 78 79 if (isset($initialResumeToken) && ! is_array($initialResumeToken) && ! is_object($initialResumeToken)) { 80 throw InvalidArgumentException::invalidType('$initialResumeToken', $initialResumeToken, 'array or object'); 81 } 82 83 if (isset($postBatchResumeToken) && ! is_object($postBatchResumeToken)) { 84 throw InvalidArgumentException::invalidType('$postBatchResumeToken', $postBatchResumeToken, 'object'); 85 } 86 87 parent::__construct($cursor); 88 89 $this->batchSize = $firstBatchSize; 90 $this->isRewindNop = ($firstBatchSize === 0); 91 $this->postBatchResumeToken = $postBatchResumeToken; 92 $this->resumeToken = $initialResumeToken; 93 } 94 95 /** @internal */ 96 final public function commandFailed(CommandFailedEvent $event) 97 { 98 } 99 100 /** @internal */ 101 final public function commandStarted(CommandStartedEvent $event) 102 { 103 if ($event->getCommandName() !== 'getMore') { 104 return; 105 } 106 107 $this->batchPosition = 0; 108 $this->batchSize = null; 109 $this->postBatchResumeToken = null; 110 } 111 112 /** @internal */ 113 final public function commandSucceeded(CommandSucceededEvent $event) 114 { 115 if ($event->getCommandName() !== 'getMore') { 116 return; 117 } 118 119 $reply = $event->getReply(); 120 121 if (! isset($reply->cursor->nextBatch) || ! is_array($reply->cursor->nextBatch)) { 122 throw new UnexpectedValueException('getMore command did not return a "cursor.nextBatch" array'); 123 } 124 125 $this->batchSize = count($reply->cursor->nextBatch); 126 127 if (isset($reply->cursor->postBatchResumeToken) && is_object($reply->cursor->postBatchResumeToken)) { 128 $this->postBatchResumeToken = $reply->cursor->postBatchResumeToken; 129 } 130 } 131 132 /** 133 * @see https://php.net/iteratoriterator.current 134 * @return mixed 135 */ 136 public function current() 137 { 138 return $this->isValid ? parent::current() : null; 139 } 140 141 /** 142 * Returns the resume token for the iterator's current position. 143 * 144 * Null may be returned if no change documents have been iterated and the 145 * server did not include a postBatchResumeToken in its aggregate or getMore 146 * command response. 147 * 148 * @return array|object|null 149 */ 150 public function getResumeToken() 151 { 152 return $this->resumeToken; 153 } 154 155 /** 156 * @see https://php.net/iteratoriterator.key 157 * @return mixed 158 */ 159 public function key() 160 { 161 return $this->isValid ? parent::key() : null; 162 } 163 164 /** 165 * @see https://php.net/iteratoriterator.rewind 166 * @return void 167 */ 168 public function next() 169 { 170 /* Determine if advancing the iterator will execute a getMore command 171 * (i.e. we are already positioned at the end of the current batch). If 172 * so, rely on the APM callbacks to reset $batchPosition and update 173 * $batchSize. Otherwise, we can forgo APM and manually increment 174 * $batchPosition after calling next(). */ 175 $getMore = $this->isAtEndOfBatch(); 176 177 if ($getMore) { 178 addSubscriber($this); 179 } 180 181 try { 182 parent::next(); 183 $this->onIteration(! $getMore); 184 } finally { 185 if ($getMore) { 186 removeSubscriber($this); 187 } 188 } 189 } 190 191 /** 192 * @see https://php.net/iteratoriterator.rewind 193 * @return void 194 */ 195 public function rewind() 196 { 197 if ($this->isRewindNop) { 198 return; 199 } 200 201 parent::rewind(); 202 $this->onIteration(false); 203 } 204 205 /** 206 * @see https://php.net/iteratoriterator.valid 207 * @return boolean 208 */ 209 public function valid() 210 { 211 return $this->isValid; 212 } 213 214 /** 215 * Extracts the resume token (i.e. "_id" field) from a change document. 216 * 217 * @param array|object $document Change document 218 * @return array|object 219 * @throws InvalidArgumentException 220 * @throws ResumeTokenException if the resume token is not found or invalid 221 */ 222 private function extractResumeToken($document) 223 { 224 if (! is_array($document) && ! is_object($document)) { 225 throw InvalidArgumentException::invalidType('$document', $document, 'array or object'); 226 } 227 228 if ($document instanceof Serializable) { 229 return $this->extractResumeToken($document->bsonSerialize()); 230 } 231 232 $resumeToken = is_array($document) 233 ? (isset($document['_id']) ? $document['_id'] : null) 234 : (isset($document->_id) ? $document->_id : null); 235 236 if (! isset($resumeToken)) { 237 $this->isValid = false; 238 throw ResumeTokenException::notFound(); 239 } 240 241 if (! is_array($resumeToken) && ! is_object($resumeToken)) { 242 $this->isValid = false; 243 throw ResumeTokenException::invalidType($resumeToken); 244 } 245 246 return $resumeToken; 247 } 248 249 /** 250 * Return whether the iterator is positioned at the end of the batch. 251 * 252 * @return boolean 253 */ 254 private function isAtEndOfBatch() 255 { 256 return $this->batchPosition + 1 >= $this->batchSize; 257 } 258 259 /** 260 * Perform housekeeping after an iteration event. 261 * 262 * @see https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.rst#updating-the-cached-resume-token 263 * @param boolean $incrementBatchPosition 264 */ 265 private function onIteration($incrementBatchPosition) 266 { 267 $this->isValid = parent::valid(); 268 269 /* Disable rewind()'s NOP behavior once we advance to a valid position. 270 * This will allow the driver to throw a LogicException if rewind() is 271 * called after the cursor has advanced past its first element. */ 272 if ($this->isRewindNop && $this->isValid) { 273 $this->isRewindNop = false; 274 } 275 276 if ($incrementBatchPosition && $this->isValid) { 277 $this->batchPosition++; 278 } 279 280 /* If the iterator is positioned at the end of the batch, apply the 281 * postBatchResumeToken if it's available. This handles both the case 282 * where the current batch is empty (since onIteration() will be called 283 * after a successful getMore) and when the iterator has advanced to the 284 * last document in its current batch. Otherwise, extract a resume token 285 * from the current document if possible. */ 286 if ($this->isAtEndOfBatch() && $this->postBatchResumeToken !== null) { 287 $this->resumeToken = $this->postBatchResumeToken; 288 } elseif ($this->isValid) { 289 $this->resumeToken = $this->extractResumeToken($this->current()); 290 } 291 } 292 }
title
Description
Body
title
Description
Body
title
Description
Body
title
Body