See Release Notes
Long Term Support Release
Differences Between: [Versions 39 and 311] [Versions 39 and 400] [Versions 39 and 401]
1 <?php 2 /* 3 * Copyright 2017 MongoDB, Inc. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18 namespace MongoDB; 19 20 use Iterator; 21 use MongoDB\Driver\CursorId; 22 use MongoDB\Driver\Exception\ConnectionException; 23 use MongoDB\Driver\Exception\RuntimeException; 24 use MongoDB\Driver\Exception\ServerException; 25 use MongoDB\Exception\ResumeTokenException; 26 use MongoDB\Model\ChangeStreamIterator; 27 use function call_user_func; 28 use function in_array; 29 30 /** 31 * Iterator for a change stream. 32 * 33 * @api 34 * @see \MongoDB\Collection::watch() 35 * @see http://docs.mongodb.org/manual/reference/command/changeStream/ 36 */ 37 class ChangeStream implements Iterator 38 { 39 /** 40 * @deprecated 1.4 41 * @todo Remove this in 2.0 (see: PHPLIB-360) 42 */ 43 const CURSOR_NOT_FOUND = 43; 44 45 /** @var array */ 46 private static $nonResumableErrorCodes = [ 47 136, // CappedPositionLost 48 237, // CursorKilled 49 11601, // Interrupted 50 ]; 51 52 /** @var callable */ 53 private $resumeCallable; 54 55 /** @var ChangeStreamIterator */ 56 private $iterator; 57 58 /** @var integer */ 59 private $key = 0; 60 61 /** 62 * Whether the change stream has advanced to its first result. This is used 63 * to determine whether $key should be incremented after an iteration event. 64 * 65 * @var boolean 66 */ 67 private $hasAdvanced = false; 68 69 /** 70 * @internal 71 * @param ChangeStreamIterator $iterator 72 * @param callable $resumeCallable 73 */ 74 public function __construct(ChangeStreamIterator $iterator, callable $resumeCallable) 75 { 76 $this->iterator = $iterator; 77 $this->resumeCallable = $resumeCallable; 78 } 79 80 /** 81 * @see http://php.net/iterator.current 82 * @return mixed 83 */ 84 public function current() 85 { 86 return $this->iterator->current(); 87 } 88 89 /** 90 * @return CursorId 91 */ 92 public function getCursorId() 93 { 94 return $this->iterator->getInnerIterator()->getId(); 95 } 96 97 /** 98 * Returns the resume token for the iterator's current position. 99 * 100 * Null may be returned if no change documents have been iterated and the 101 * server did not include a postBatchResumeToken in its aggregate or getMore 102 * command response. 103 * 104 * @return array|object|null 105 */ 106 public function getResumeToken() 107 { 108 return $this->iterator->getResumeToken(); 109 } 110 111 /** 112 * @see http://php.net/iterator.key 113 * @return mixed 114 */ 115 public function key() 116 { 117 if ($this->valid()) { 118 return $this->key; 119 } 120 121 return null; 122 } 123 124 /** 125 * @see http://php.net/iterator.next 126 * @return void 127 * @throws ResumeTokenException 128 */ 129 public function next() 130 { 131 try { 132 $this->iterator->next(); 133 $this->onIteration($this->hasAdvanced); 134 } catch (RuntimeException $e) { 135 $this->resumeOrThrow($e); 136 } 137 } 138 139 /** 140 * @see http://php.net/iterator.rewind 141 * @return void 142 * @throws ResumeTokenException 143 */ 144 public function rewind() 145 { 146 try { 147 $this->iterator->rewind(); 148 /* Unlike next() and resume(), the decision to increment the key 149 * does not depend on whether the change stream has advanced. This 150 * ensures that multiple calls to rewind() do not alter state. */ 151 $this->onIteration(false); 152 } catch (RuntimeException $e) { 153 $this->resumeOrThrow($e); 154 } 155 } 156 157 /** 158 * @see http://php.net/iterator.valid 159 * @return boolean 160 */ 161 public function valid() 162 { 163 return $this->iterator->valid(); 164 } 165 166 /** 167 * Determines if an exception is a resumable error. 168 * 169 * @see https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.rst#resumable-error 170 * @param RuntimeException $exception 171 * @return boolean 172 */ 173 private function isResumableError(RuntimeException $exception) 174 { 175 if ($exception instanceof ConnectionException) { 176 return true; 177 } 178 179 if (! $exception instanceof ServerException) { 180 return false; 181 } 182 183 if ($exception->hasErrorLabel('NonResumableChangeStreamError')) { 184 return false; 185 } 186 187 if (in_array($exception->getCode(), self::$nonResumableErrorCodes)) { 188 return false; 189 } 190 191 return true; 192 } 193 194 /** 195 * Perform housekeeping after an iteration event. 196 * 197 * @param boolean $incrementKey Increment $key if there is a current result 198 * @throws ResumeTokenException 199 */ 200 private function onIteration($incrementKey) 201 { 202 /* If the cursorId is 0, the server has invalidated the cursor and we 203 * will never perform another getMore nor need to resume since any 204 * remaining results (up to and including the invalidate event) will 205 * have been received in the last response. Therefore, we can unset the 206 * resumeCallable. This will free any reference to Watch as well as the 207 * only reference to any implicit session created therein. */ 208 if ((string) $this->getCursorId() === '0') { 209 $this->resumeCallable = null; 210 } 211 212 /* Return early if there is not a current result. Avoid any attempt to 213 * increment the iterator's key. */ 214 if (! $this->valid()) { 215 return; 216 } 217 218 if ($incrementKey) { 219 $this->key++; 220 } 221 222 $this->hasAdvanced = true; 223 } 224 225 /** 226 * Recreates the ChangeStreamIterator after a resumable server error. 227 * 228 * @return void 229 */ 230 private function resume() 231 { 232 $this->iterator = call_user_func($this->resumeCallable, $this->getResumeToken(), $this->hasAdvanced); 233 $this->iterator->rewind(); 234 235 $this->onIteration($this->hasAdvanced); 236 } 237 238 /** 239 * Either resumes after a resumable error or re-throws the exception. 240 * 241 * @param RuntimeException $exception 242 * @throws RuntimeException 243 */ 244 private function resumeOrThrow(RuntimeException $exception) 245 { 246 if ($this->isResumableError($exception)) { 247 $this->resume(); 248 249 return; 250 } 251 252 throw $exception; 253 } 254 }
title
Description
Body
title
Description
Body
title
Description
Body
title
Body