Differences Between: [Versions 310 and 400] [Versions 39 and 400] [Versions 400 and 401]
1 <?php 2 /* 3 * Copyright 2017 MongoDB, Inc. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18 namespace MongoDB; 19 20 use Iterator; 21 use MongoDB\Driver\CursorId; 22 use MongoDB\Driver\Exception\ConnectionException; 23 use MongoDB\Driver\Exception\RuntimeException; 24 use MongoDB\Driver\Exception\ServerException; 25 use MongoDB\Exception\ResumeTokenException; 26 use MongoDB\Model\ChangeStreamIterator; 27 use function call_user_func; 28 use function in_array; 29 30 /** 31 * Iterator for a change stream. 32 * 33 * @api 34 * @see \MongoDB\Collection::watch() 35 * @see http://docs.mongodb.org/manual/reference/command/changeStream/ 36 */ 37 class ChangeStream implements Iterator 38 { 39 /** 40 * @deprecated 1.4 41 * @todo Remove this in 2.0 (see: PHPLIB-360) 42 */ 43 const CURSOR_NOT_FOUND = 43; 44 45 /** @var int */ 46 private static $cursorNotFound = 43; 47 48 /** @var int[] */ 49 private static $resumableErrorCodes = [ 50 6, // HostUnreachable 51 7, // HostNotFound 52 89, // NetworkTimeout 53 91, // ShutdownInProgress 54 189, // PrimarySteppedDown 55 262, // ExceededTimeLimit 56 9001, // SocketException 57 10107, // NotMaster 58 11600, // InterruptedAtShutdown 59 11602, // InterruptedDueToReplStateChange 60 13435, // NotMasterNoSlaveOk 61 13436, // NotMasterOrSecondary 62 63, // StaleShardVersion 63 150, // StaleEpoch 64 13388, // StaleConfig 65 234, // RetryChangeStream 66 133, // FailedToSatisfyReadPreference 67 ]; 68 69 /** @var int */ 70 private static $wireVersionForResumableChangeStreamError = 9; 71 72 /** @var callable */ 73 private $resumeCallable; 74 75 /** @var ChangeStreamIterator */ 76 private $iterator; 77 78 /** @var integer */ 79 private $key = 0; 80 81 /** 82 * Whether the change stream has advanced to its first result. This is used 83 * to determine whether $key should be incremented after an iteration event. 84 * 85 * @var boolean 86 */ 87 private $hasAdvanced = false; 88 89 /** 90 * @internal 91 * @param ChangeStreamIterator $iterator 92 * @param callable $resumeCallable 93 */ 94 public function __construct(ChangeStreamIterator $iterator, callable $resumeCallable) 95 { 96 $this->iterator = $iterator; 97 $this->resumeCallable = $resumeCallable; 98 } 99 100 /** 101 * @see http://php.net/iterator.current 102 * @return mixed 103 */ 104 public function current() 105 { 106 return $this->iterator->current(); 107 } 108 109 /** 110 * @return CursorId 111 */ 112 public function getCursorId() 113 { 114 return $this->iterator->getInnerIterator()->getId(); 115 } 116 117 /** 118 * Returns the resume token for the iterator's current position. 119 * 120 * Null may be returned if no change documents have been iterated and the 121 * server did not include a postBatchResumeToken in its aggregate or getMore 122 * command response. 123 * 124 * @return array|object|null 125 */ 126 public function getResumeToken() 127 { 128 return $this->iterator->getResumeToken(); 129 } 130 131 /** 132 * @see http://php.net/iterator.key 133 * @return mixed 134 */ 135 public function key() 136 { 137 if ($this->valid()) { 138 return $this->key; 139 } 140 141 return null; 142 } 143 144 /** 145 * @see http://php.net/iterator.next 146 * @return void 147 * @throws ResumeTokenException 148 */ 149 public function next() 150 { 151 try { 152 $this->iterator->next(); 153 $this->onIteration($this->hasAdvanced); 154 } catch (RuntimeException $e) { 155 $this->resumeOrThrow($e); 156 } 157 } 158 159 /** 160 * @see http://php.net/iterator.rewind 161 * @return void 162 * @throws ResumeTokenException 163 */ 164 public function rewind() 165 { 166 try { 167 $this->iterator->rewind(); 168 /* Unlike next() and resume(), the decision to increment the key 169 * does not depend on whether the change stream has advanced. This 170 * ensures that multiple calls to rewind() do not alter state. */ 171 $this->onIteration(false); 172 } catch (RuntimeException $e) { 173 $this->resumeOrThrow($e); 174 } 175 } 176 177 /** 178 * @see http://php.net/iterator.valid 179 * @return boolean 180 */ 181 public function valid() 182 { 183 return $this->iterator->valid(); 184 } 185 186 /** 187 * Determines if an exception is a resumable error. 188 * 189 * @see https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.rst#resumable-error 190 * @param RuntimeException $exception 191 * @return boolean 192 */ 193 private function isResumableError(RuntimeException $exception) 194 { 195 if ($exception instanceof ConnectionException) { 196 return true; 197 } 198 199 if (! $exception instanceof ServerException) { 200 return false; 201 } 202 203 if ($exception->getCode() === self::$cursorNotFound) { 204 return true; 205 } 206 207 if (server_supports_feature($this->iterator->getServer(), self::$wireVersionForResumableChangeStreamError)) { 208 return $exception->hasErrorLabel('ResumableChangeStreamError'); 209 } 210 211 return in_array($exception->getCode(), self::$resumableErrorCodes); 212 } 213 214 /** 215 * Perform housekeeping after an iteration event. 216 * 217 * @param boolean $incrementKey Increment $key if there is a current result 218 * @throws ResumeTokenException 219 */ 220 private function onIteration($incrementKey) 221 { 222 /* If the cursorId is 0, the server has invalidated the cursor and we 223 * will never perform another getMore nor need to resume since any 224 * remaining results (up to and including the invalidate event) will 225 * have been received in the last response. Therefore, we can unset the 226 * resumeCallable. This will free any reference to Watch as well as the 227 * only reference to any implicit session created therein. */ 228 if ((string) $this->getCursorId() === '0') { 229 $this->resumeCallable = null; 230 } 231 232 /* Return early if there is not a current result. Avoid any attempt to 233 * increment the iterator's key. */ 234 if (! $this->valid()) { 235 return; 236 } 237 238 if ($incrementKey) { 239 $this->key++; 240 } 241 242 $this->hasAdvanced = true; 243 } 244 245 /** 246 * Recreates the ChangeStreamIterator after a resumable server error. 247 * 248 * @return void 249 */ 250 private function resume() 251 { 252 $this->iterator = call_user_func($this->resumeCallable, $this->getResumeToken(), $this->hasAdvanced); 253 $this->iterator->rewind(); 254 255 $this->onIteration($this->hasAdvanced); 256 } 257 258 /** 259 * Either resumes after a resumable error or re-throws the exception. 260 * 261 * @param RuntimeException $exception 262 * @throws RuntimeException 263 */ 264 private function resumeOrThrow(RuntimeException $exception) 265 { 266 if ($this->isResumableError($exception)) { 267 $this->resume(); 268 269 return; 270 } 271 272 throw $exception; 273 } 274 }
title
Description
Body
title
Description
Body
title
Description
Body
title
Body