Differences Between: [Versions 310 and 400] [Versions 39 and 400] [Versions 400 and 401]
1 <?php 2 /* 3 * Copyright 2017 MongoDB, Inc. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18 namespace MongoDB\Operation; 19 20 use MongoDB\BSON\TimestampInterface; 21 use MongoDB\ChangeStream; 22 use MongoDB\Driver\Cursor; 23 use MongoDB\Driver\Exception\RuntimeException; 24 use MongoDB\Driver\Manager; 25 use MongoDB\Driver\Monitoring\CommandFailedEvent; 26 use MongoDB\Driver\Monitoring\CommandStartedEvent; 27 use MongoDB\Driver\Monitoring\CommandSubscriber; 28 use MongoDB\Driver\Monitoring\CommandSucceededEvent; 29 use MongoDB\Driver\ReadPreference; 30 use MongoDB\Driver\Server; 31 use MongoDB\Exception\InvalidArgumentException; 32 use MongoDB\Exception\UnexpectedValueException; 33 use MongoDB\Exception\UnsupportedException; 34 use MongoDB\Model\ChangeStreamIterator; 35 use function array_intersect_key; 36 use function array_unshift; 37 use function count; 38 use function is_array; 39 use function is_object; 40 use function is_string; 41 use function MongoDB\Driver\Monitoring\addSubscriber; 42 use function MongoDB\Driver\Monitoring\removeSubscriber; 43 use function MongoDB\select_server; 44 use function MongoDB\server_supports_feature; 45 46 /** 47 * Operation for creating a change stream with the aggregate command. 48 * 49 * Note: the implementation of CommandSubscriber is an internal implementation 50 * detail and should not be considered part of the public API. 51 * 52 * @api 53 * @see \MongoDB\Collection::watch() 54 * @see https://docs.mongodb.com/manual/changeStreams/ 55 */ 56 class Watch implements Executable, /* @internal */ CommandSubscriber 57 { 58 const FULL_DOCUMENT_DEFAULT = 'default'; 59 const FULL_DOCUMENT_UPDATE_LOOKUP = 'updateLookup'; 60 61 /** @var integer */ 62 private static $wireVersionForStartAtOperationTime = 7; 63 64 /** @var Aggregate */ 65 private $aggregate; 66 67 /** @var array */ 68 private $aggregateOptions; 69 70 /** @var array */ 71 private $changeStreamOptions; 72 73 /** @var string|null */ 74 private $collectionName; 75 76 /** @var string */ 77 private $databaseName; 78 79 /** @var integer|null */ 80 private $firstBatchSize; 81 82 /** @var boolean */ 83 private $hasResumed = false; 84 85 /** @var Manager */ 86 private $manager; 87 88 /** @var TimestampInterface */ 89 private $operationTime; 90 91 /** @var array */ 92 private $pipeline; 93 94 /** @var object|null */ 95 private $postBatchResumeToken; 96 97 /** 98 * Constructs an aggregate command for creating a change stream. 99 * 100 * Supported options: 101 * 102 * * batchSize (integer): The number of documents to return per batch. 103 * 104 * * collation (document): Specifies a collation. 105 * 106 * * fullDocument (string): Determines whether the "fullDocument" field 107 * will be populated for update operations. By default, change streams 108 * only return the delta of fields during the update operation (via the 109 * "updateDescription" field). To additionally return the most current 110 * majority-committed version of the updated document, specify 111 * "updateLookup" for this option. Defaults to "default". 112 * 113 * Insert and replace operations always include the "fullDocument" field 114 * and delete operations omit the field as the document no longer exists. 115 * 116 * * maxAwaitTimeMS (integer): The maximum amount of time for the server to 117 * wait on new documents to satisfy a change stream query. 118 * 119 * * readConcern (MongoDB\Driver\ReadConcern): Read concern. 120 * 121 * * readPreference (MongoDB\Driver\ReadPreference): Read preference. This 122 * will be used to select a new server when resuming. Defaults to a 123 * "primary" read preference. 124 * 125 * * resumeAfter (document): Specifies the logical starting point for the 126 * new change stream. 127 * 128 * Using this option in conjunction with "startAfter" and/or 129 * "startAtOperationTime" will result in a server error. The options are 130 * mutually exclusive. 131 * 132 * * session (MongoDB\Driver\Session): Client session. 133 * 134 * Sessions are not supported for server versions < 3.6. 135 * 136 * * startAfter (document): Specifies the logical starting point for the 137 * new change stream. Unlike "resumeAfter", this option can be used with 138 * a resume token from an "invalidate" event. 139 * 140 * Using this option in conjunction with "resumeAfter" and/or 141 * "startAtOperationTime" will result in a server error. The options are 142 * mutually exclusive. 143 * 144 * * startAtOperationTime (MongoDB\BSON\TimestampInterface): If specified, 145 * the change stream will only provide changes that occurred at or after 146 * the specified timestamp. Any command run against the server will 147 * return an operation time that can be used here. Alternatively, an 148 * operation time may be obtained from MongoDB\Driver\Server::getInfo(). 149 * 150 * Using this option in conjunction with "resumeAfter" and/or 151 * "startAfter" will result in a server error. The options are mutually 152 * exclusive. 153 * 154 * This option is not supported for server versions < 4.0. 155 * 156 * * typeMap (array): Type map for BSON deserialization. This will be 157 * applied to the returned Cursor (it is not sent to the server). 158 * 159 * Note: A database-level change stream may be created by specifying null 160 * for the collection name. A cluster-level change stream may be created by 161 * specifying null for both the database and collection name. 162 * 163 * @param Manager $manager Manager instance from the driver 164 * @param string|null $databaseName Database name 165 * @param string|null $collectionName Collection name 166 * @param array $pipeline List of pipeline operations 167 * @param array $options Command options 168 * @throws InvalidArgumentException for parameter/option parsing errors 169 */ 170 public function __construct(Manager $manager, $databaseName, $collectionName, array $pipeline, array $options = []) 171 { 172 if (isset($collectionName) && ! isset($databaseName)) { 173 throw new InvalidArgumentException('$collectionName should also be null if $databaseName is null'); 174 } 175 176 $options += [ 177 'fullDocument' => self::FULL_DOCUMENT_DEFAULT, 178 'readPreference' => new ReadPreference(ReadPreference::RP_PRIMARY), 179 ]; 180 181 if (! is_string($options['fullDocument'])) { 182 throw InvalidArgumentException::invalidType('"fullDocument" option', $options['fullDocument'], 'string'); 183 } 184 185 if (! $options['readPreference'] instanceof ReadPreference) { 186 throw InvalidArgumentException::invalidType('"readPreference" option', $options['readPreference'], ReadPreference::class); 187 } 188 189 if (isset($options['resumeAfter']) && ! is_array($options['resumeAfter']) && ! is_object($options['resumeAfter'])) { 190 throw InvalidArgumentException::invalidType('"resumeAfter" option', $options['resumeAfter'], 'array or object'); 191 } 192 193 if (isset($options['startAfter']) && ! is_array($options['startAfter']) && ! is_object($options['startAfter'])) { 194 throw InvalidArgumentException::invalidType('"startAfter" option', $options['startAfter'], 'array or object'); 195 } 196 197 if (isset($options['startAtOperationTime']) && ! $options['startAtOperationTime'] instanceof TimestampInterface) { 198 throw InvalidArgumentException::invalidType('"startAtOperationTime" option', $options['startAtOperationTime'], TimestampInterface::class); 199 } 200 201 /* In the absence of an explicit session, create one to ensure that the 202 * initial aggregation and any resume attempts can use the same session 203 * ("implicit from the user's perspective" per PHPLIB-342). Since this 204 * is filling in for an implicit session, we default "causalConsistency" 205 * to false. */ 206 if (! isset($options['session'])) { 207 try { 208 $options['session'] = $manager->startSession(['causalConsistency' => false]); 209 } catch (RuntimeException $e) { 210 /* We can ignore the exception, as libmongoc likely cannot 211 * create its own session and there is no risk of a mismatch. */ 212 } 213 } 214 215 $this->aggregateOptions = array_intersect_key($options, ['batchSize' => 1, 'collation' => 1, 'maxAwaitTimeMS' => 1, 'readConcern' => 1, 'readPreference' => 1, 'session' => 1, 'typeMap' => 1]); 216 $this->changeStreamOptions = array_intersect_key($options, ['fullDocument' => 1, 'resumeAfter' => 1, 'startAfter' => 1, 'startAtOperationTime' => 1]); 217 218 // Null database name implies a cluster-wide change stream 219 if ($databaseName === null) { 220 $databaseName = 'admin'; 221 $this->changeStreamOptions['allChangesForCluster'] = true; 222 } 223 224 $this->manager = $manager; 225 $this->databaseName = (string) $databaseName; 226 $this->collectionName = isset($collectionName) ? (string) $collectionName : null; 227 $this->pipeline = $pipeline; 228 229 $this->aggregate = $this->createAggregate(); 230 } 231 232 /** @internal */ 233 final public function commandFailed(CommandFailedEvent $event) 234 { 235 } 236 237 /** @internal */ 238 final public function commandStarted(CommandStartedEvent $event) 239 { 240 if ($event->getCommandName() !== 'aggregate') { 241 return; 242 } 243 244 $this->firstBatchSize = null; 245 $this->postBatchResumeToken = null; 246 } 247 248 /** @internal */ 249 final public function commandSucceeded(CommandSucceededEvent $event) 250 { 251 if ($event->getCommandName() !== 'aggregate') { 252 return; 253 } 254 255 $reply = $event->getReply(); 256 257 if (! isset($reply->cursor->firstBatch) || ! is_array($reply->cursor->firstBatch)) { 258 throw new UnexpectedValueException('aggregate command did not return a "cursor.firstBatch" array'); 259 } 260 261 $this->firstBatchSize = count($reply->cursor->firstBatch); 262 263 if (isset($reply->cursor->postBatchResumeToken) && is_object($reply->cursor->postBatchResumeToken)) { 264 $this->postBatchResumeToken = $reply->cursor->postBatchResumeToken; 265 } 266 267 if ($this->shouldCaptureOperationTime($event->getServer()) && 268 isset($reply->operationTime) && $reply->operationTime instanceof TimestampInterface) { 269 $this->operationTime = $reply->operationTime; 270 } 271 } 272 273 /** 274 * Execute the operation. 275 * 276 * @see Executable::execute() 277 * @param Server $server 278 * @return ChangeStream 279 * @throws UnsupportedException if collation or read concern is used and unsupported 280 * @throws RuntimeException for other driver errors (e.g. connection errors) 281 */ 282 public function execute(Server $server) 283 { 284 return new ChangeStream( 285 $this->createChangeStreamIterator($server), 286 function ($resumeToken, $hasAdvanced) { 287 return $this->resume($resumeToken, $hasAdvanced); 288 } 289 ); 290 } 291 292 /** 293 * Create the aggregate command for a change stream. 294 * 295 * This method is also used to recreate the aggregate command when resuming. 296 * 297 * @return Aggregate 298 */ 299 private function createAggregate() 300 { 301 $pipeline = $this->pipeline; 302 array_unshift($pipeline, ['$changeStream' => (object) $this->changeStreamOptions]); 303 304 return new Aggregate($this->databaseName, $this->collectionName, $pipeline, $this->aggregateOptions); 305 } 306 307 /** 308 * Create a ChangeStreamIterator by executing the aggregate command. 309 * 310 * @param Server $server 311 * @return ChangeStreamIterator 312 */ 313 private function createChangeStreamIterator(Server $server) 314 { 315 return new ChangeStreamIterator( 316 $this->executeAggregate($server), 317 $this->firstBatchSize, 318 $this->getInitialResumeToken(), 319 $this->postBatchResumeToken 320 ); 321 } 322 323 /** 324 * Execute the aggregate command. 325 * 326 * The command will be executed using APM so that we can capture data from 327 * its response (e.g. firstBatch size, postBatchResumeToken). 328 * 329 * @param Server $server 330 * @return Cursor 331 */ 332 private function executeAggregate(Server $server) 333 { 334 addSubscriber($this); 335 336 try { 337 return $this->aggregate->execute($server); 338 } finally { 339 removeSubscriber($this); 340 } 341 } 342 343 /** 344 * Return the initial resume token for creating the ChangeStreamIterator. 345 * 346 * @see https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.rst#updating-the-cached-resume-token 347 * @return array|object|null 348 */ 349 private function getInitialResumeToken() 350 { 351 if ($this->firstBatchSize === 0 && isset($this->postBatchResumeToken)) { 352 return $this->postBatchResumeToken; 353 } 354 355 if (isset($this->changeStreamOptions['startAfter'])) { 356 return $this->changeStreamOptions['startAfter']; 357 } 358 359 if (isset($this->changeStreamOptions['resumeAfter'])) { 360 return $this->changeStreamOptions['resumeAfter']; 361 } 362 363 return null; 364 } 365 366 /** 367 * Resumes a change stream. 368 * 369 * @see https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.rst#resume-process 370 * @param array|object|null $resumeToken 371 * @param bool $hasAdvanced 372 * @return ChangeStreamIterator 373 * @throws InvalidArgumentException 374 */ 375 private function resume($resumeToken = null, $hasAdvanced = false) 376 { 377 if (isset($resumeToken) && ! is_array($resumeToken) && ! is_object($resumeToken)) { 378 throw InvalidArgumentException::invalidType('$resumeToken', $resumeToken, 'array or object'); 379 } 380 381 $this->hasResumed = true; 382 383 /* Select a new server using the original read preference. While watch 384 * is not usable within transactions, we still check if there is a 385 * pinned session. This is to avoid an ambiguous error message about 386 * running a command on the wrong server. */ 387 $server = select_server($this->manager, $this->aggregateOptions); 388 389 $resumeOption = isset($this->changeStreamOptions['startAfter']) && ! $hasAdvanced ? 'startAfter' : 'resumeAfter'; 390 391 unset($this->changeStreamOptions['resumeAfter']); 392 unset($this->changeStreamOptions['startAfter']); 393 unset($this->changeStreamOptions['startAtOperationTime']); 394 395 if ($resumeToken !== null) { 396 $this->changeStreamOptions[$resumeOption] = $resumeToken; 397 } 398 399 if ($resumeToken === null && $this->operationTime !== null) { 400 $this->changeStreamOptions['startAtOperationTime'] = $this->operationTime; 401 } 402 403 // Recreate the aggregate command and return a new ChangeStreamIterator 404 $this->aggregate = $this->createAggregate(); 405 406 return $this->createChangeStreamIterator($server); 407 } 408 409 /** 410 * Determine whether to capture operation time from an aggregate response. 411 * 412 * @see https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.rst#startatoperationtime 413 * @param Server $server 414 * @return boolean 415 */ 416 private function shouldCaptureOperationTime(Server $server) 417 { 418 if ($this->hasResumed) { 419 return false; 420 } 421 422 if (isset($this->changeStreamOptions['resumeAfter']) || 423 isset($this->changeStreamOptions['startAfter']) || 424 isset($this->changeStreamOptions['startAtOperationTime'])) { 425 return false; 426 } 427 428 if ($this->firstBatchSize > 0) { 429 return false; 430 } 431 432 if ($this->postBatchResumeToken !== null) { 433 return false; 434 } 435 436 if (! server_supports_feature($server, self::$wireVersionForStartAtOperationTime)) { 437 return false; 438 } 439 440 return true; 441 } 442 }
title
Description
Body
title
Description
Body
title
Description
Body
title
Body