See Release Notes
Long Term Support Release
Differences Between: [Versions 310 and 401] [Versions 311 and 401] [Versions 39 and 401] [Versions 400 and 401]
1 <?php 2 /* 3 * Copyright 2017-present MongoDB, Inc. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * https://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18 namespace MongoDB; 19 20 use Iterator; 21 use MongoDB\Driver\CursorId; 22 use MongoDB\Driver\Exception\ConnectionException; 23 use MongoDB\Driver\Exception\RuntimeException; 24 use MongoDB\Driver\Exception\ServerException; 25 use MongoDB\Exception\BadMethodCallException; 26 use MongoDB\Exception\ResumeTokenException; 27 use MongoDB\Model\ChangeStreamIterator; 28 use ReturnTypeWillChange; 29 30 use function call_user_func; 31 use function in_array; 32 33 /** 34 * Iterator for a change stream. 35 * 36 * @psalm-type ResumeCallable = callable(array|object|null, bool): ChangeStreamIterator 37 * 38 * @api 39 * @see \MongoDB\Collection::watch() 40 * @see https://mongodb.com/docs/manual/reference/method/db.watch/#mongodb-method-db.watch 41 */ 42 class ChangeStream implements Iterator 43 { 44 /** 45 * @deprecated 1.4 46 * @todo Remove this in 2.0 (see: PHPLIB-360) 47 */ 48 public const CURSOR_NOT_FOUND = 43; 49 50 /** @var int */ 51 private static $cursorNotFound = 43; 52 53 /** @var int[] */ 54 private static $resumableErrorCodes = [ 55 6, // HostUnreachable 56 7, // HostNotFound 57 89, // NetworkTimeout 58 91, // ShutdownInProgress 59 189, // PrimarySteppedDown 60 262, // ExceededTimeLimit 61 9001, // SocketException 62 10107, // NotPrimary 63 11600, // InterruptedAtShutdown 64 11602, // InterruptedDueToReplStateChange 65 13435, // NotPrimaryNoSecondaryOk 66 13436, // NotPrimaryOrSecondary 67 63, // StaleShardVersion 68 150, // StaleEpoch 69 13388, // StaleConfig 70 234, // RetryChangeStream 71 133, // FailedToSatisfyReadPreference 72 ]; 73 74 /** @var int */ 75 private static $wireVersionForResumableChangeStreamError = 9; 76 77 /** @var ResumeCallable|null */ 78 private $resumeCallable; 79 80 /** @var ChangeStreamIterator */ 81 private $iterator; 82 83 /** @var integer */ 84 private $key = 0; 85 86 /** 87 * Whether the change stream has advanced to its first result. This is used 88 * to determine whether $key should be incremented after an iteration event. 89 * 90 * @var boolean 91 */ 92 private $hasAdvanced = false; 93 94 /** 95 * @internal 96 * 97 * @param ResumeCallable $resumeCallable 98 */ 99 public function __construct(ChangeStreamIterator $iterator, callable $resumeCallable) 100 { 101 $this->iterator = $iterator; 102 $this->resumeCallable = $resumeCallable; 103 } 104 105 /** 106 * @see https://php.net/iterator.current 107 * @return mixed 108 */ 109 #[ReturnTypeWillChange] 110 public function current() 111 { 112 return $this->iterator->current(); 113 } 114 115 /** 116 * @return CursorId 117 */ 118 public function getCursorId() 119 { 120 return $this->iterator->getInnerIterator()->getId(); 121 } 122 123 /** 124 * Returns the resume token for the iterator's current position. 125 * 126 * Null may be returned if no change documents have been iterated and the 127 * server did not include a postBatchResumeToken in its aggregate or getMore 128 * command response. 129 * 130 * @return array|object|null 131 */ 132 public function getResumeToken() 133 { 134 return $this->iterator->getResumeToken(); 135 } 136 137 /** 138 * @see https://php.net/iterator.key 139 * @return mixed 140 */ 141 #[ReturnTypeWillChange] 142 public function key() 143 { 144 if ($this->valid()) { 145 return $this->key; 146 } 147 148 return null; 149 } 150 151 /** 152 * @see https://php.net/iterator.next 153 * @return void 154 * @throws ResumeTokenException 155 */ 156 #[ReturnTypeWillChange] 157 public function next() 158 { 159 try { 160 $this->iterator->next(); 161 $this->onIteration($this->hasAdvanced); 162 } catch (RuntimeException $e) { 163 $this->resumeOrThrow($e); 164 } 165 } 166 167 /** 168 * @see https://php.net/iterator.rewind 169 * @return void 170 * @throws ResumeTokenException 171 */ 172 #[ReturnTypeWillChange] 173 public function rewind() 174 { 175 try { 176 $this->iterator->rewind(); 177 /* Unlike next() and resume(), the decision to increment the key 178 * does not depend on whether the change stream has advanced. This 179 * ensures that multiple calls to rewind() do not alter state. */ 180 $this->onIteration(false); 181 } catch (RuntimeException $e) { 182 $this->resumeOrThrow($e); 183 } 184 } 185 186 /** 187 * @see https://php.net/iterator.valid 188 * @return boolean 189 */ 190 #[ReturnTypeWillChange] 191 public function valid() 192 { 193 return $this->iterator->valid(); 194 } 195 196 /** 197 * Determines if an exception is a resumable error. 198 * 199 * @see https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.rst#resumable-error 200 */ 201 private function isResumableError(RuntimeException $exception): bool 202 { 203 if ($exception instanceof ConnectionException) { 204 return true; 205 } 206 207 if (! $exception instanceof ServerException) { 208 return false; 209 } 210 211 if ($exception->getCode() === self::$cursorNotFound) { 212 return true; 213 } 214 215 if (server_supports_feature($this->iterator->getServer(), self::$wireVersionForResumableChangeStreamError)) { 216 return $exception->hasErrorLabel('ResumableChangeStreamError'); 217 } 218 219 return in_array($exception->getCode(), self::$resumableErrorCodes); 220 } 221 222 /** 223 * Perform housekeeping after an iteration event. 224 * 225 * @param boolean $incrementKey Increment $key if there is a current result 226 * @throws ResumeTokenException 227 */ 228 private function onIteration(bool $incrementKey): void 229 { 230 /* If the cursorId is 0, the server has invalidated the cursor and we 231 * will never perform another getMore nor need to resume since any 232 * remaining results (up to and including the invalidate event) will 233 * have been received in the last response. Therefore, we can unset the 234 * resumeCallable. This will free any reference to Watch as well as the 235 * only reference to any implicit session created therein. */ 236 if ((string) $this->getCursorId() === '0') { 237 $this->resumeCallable = null; 238 } 239 240 /* Return early if there is not a current result. Avoid any attempt to 241 * increment the iterator's key. */ 242 if (! $this->valid()) { 243 return; 244 } 245 246 if ($incrementKey) { 247 $this->key++; 248 } 249 250 $this->hasAdvanced = true; 251 } 252 253 /** 254 * Recreates the ChangeStreamIterator after a resumable server error. 255 */ 256 private function resume(): void 257 { 258 if (! $this->resumeCallable) { 259 throw new BadMethodCallException('Cannot resume a closed change stream.'); 260 } 261 262 $this->iterator = call_user_func($this->resumeCallable, $this->getResumeToken(), $this->hasAdvanced); 263 264 $this->iterator->rewind(); 265 266 $this->onIteration($this->hasAdvanced); 267 } 268 269 /** 270 * Either resumes after a resumable error or re-throws the exception. 271 * 272 * @throws RuntimeException 273 */ 274 private function resumeOrThrow(RuntimeException $exception): void 275 { 276 if ($this->isResumableError($exception)) { 277 $this->resume(); 278 279 return; 280 } 281 282 throw $exception; 283 } 284 }
title
Description
Body
title
Description
Body
title
Description
Body
title
Body