See Release Notes
Long Term Support Release
Differences Between: [Versions 310 and 401] [Versions 311 and 401] [Versions 39 and 401] [Versions 400 and 401]
1 <?php 2 /* 3 * Copyright 2017-present MongoDB, Inc. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * https://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18 namespace MongoDB\Operation; 19 20 use MongoDB\BSON\TimestampInterface; 21 use MongoDB\ChangeStream; 22 use MongoDB\Driver\Cursor; 23 use MongoDB\Driver\Exception\RuntimeException; 24 use MongoDB\Driver\Manager; 25 use MongoDB\Driver\Monitoring\CommandFailedEvent; 26 use MongoDB\Driver\Monitoring\CommandStartedEvent; 27 use MongoDB\Driver\Monitoring\CommandSubscriber; 28 use MongoDB\Driver\Monitoring\CommandSucceededEvent; 29 use MongoDB\Driver\ReadPreference; 30 use MongoDB\Driver\Server; 31 use MongoDB\Exception\InvalidArgumentException; 32 use MongoDB\Exception\UnexpectedValueException; 33 use MongoDB\Exception\UnsupportedException; 34 use MongoDB\Model\ChangeStreamIterator; 35 36 use function array_intersect_key; 37 use function array_key_exists; 38 use function array_unshift; 39 use function assert; 40 use function count; 41 use function is_array; 42 use function is_bool; 43 use function is_object; 44 use function is_string; 45 use function MongoDB\Driver\Monitoring\addSubscriber; 46 use function MongoDB\Driver\Monitoring\removeSubscriber; 47 use function MongoDB\select_server; 48 use function MongoDB\server_supports_feature; 49 50 /** 51 * Operation for creating a change stream with the aggregate command. 52 * 53 * Note: the implementation of CommandSubscriber is an internal implementation 54 * detail and should not be considered part of the public API. 55 * 56 * @api 57 * @see \MongoDB\Collection::watch() 58 * @see https://mongodb.com/docs/manual/changeStreams/ 59 */ 60 class Watch implements Executable, /* @internal */ CommandSubscriber 61 { 62 public const FULL_DOCUMENT_DEFAULT = 'default'; 63 public const FULL_DOCUMENT_UPDATE_LOOKUP = 'updateLookup'; 64 public const FULL_DOCUMENT_WHEN_AVAILABLE = 'whenAvailable'; 65 public const FULL_DOCUMENT_REQUIRED = 'required'; 66 67 public const FULL_DOCUMENT_BEFORE_CHANGE_OFF = 'off'; 68 public const FULL_DOCUMENT_BEFORE_CHANGE_WHEN_AVAILABLE = 'whenAvailable'; 69 public const FULL_DOCUMENT_BEFORE_CHANGE_REQUIRED = 'required'; 70 71 /** @var integer */ 72 private static $wireVersionForStartAtOperationTime = 7; 73 74 /** @var Aggregate */ 75 private $aggregate; 76 77 /** @var array */ 78 private $aggregateOptions; 79 80 /** @var array */ 81 private $changeStreamOptions; 82 83 /** @var string|null */ 84 private $collectionName; 85 86 /** @var string */ 87 private $databaseName; 88 89 /** @var integer */ 90 private $firstBatchSize; 91 92 /** @var boolean */ 93 private $hasResumed = false; 94 95 /** @var Manager */ 96 private $manager; 97 98 /** @var TimestampInterface */ 99 private $operationTime; 100 101 /** @var array */ 102 private $pipeline; 103 104 /** @var object|null */ 105 private $postBatchResumeToken; 106 107 /** 108 * Constructs an aggregate command for creating a change stream. 109 * 110 * Supported options: 111 * 112 * * batchSize (integer): The number of documents to return per batch. 113 * 114 * * collation (document): Specifies a collation. 115 * 116 * * comment (mixed): BSON value to attach as a comment to this command. 117 * 118 * Only string values are supported for server versions < 4.4. 119 * 120 * * fullDocument (string): Determines how the "fullDocument" response 121 * field will be populated for update operations. 122 * 123 * By default, change streams only return the delta of fields (via an 124 * "updateDescription" field) for update operations and "fullDocument" is 125 * omitted. Insert and replace operations always include the 126 * "fullDocument" field. Delete operations omit the field as the document 127 * no longer exists. 128 * 129 * Specify "updateLookup" to return the current majority-committed 130 * version of the updated document. 131 * 132 * MongoDB 6.0+ allows returning the post-image of the modified document 133 * if the collection has changeStreamPreAndPostImages enabled. Specify 134 * "whenAvailable" to return the post-image if available or a null value 135 * if not. Specify "required" to return the post-image if available or 136 * raise an error if not. 137 * 138 * * fullDocumentBeforeChange (string): Determines how the 139 * "fullDocumentBeforeChange" response field will be populated. By 140 * default, the field is omitted. 141 * 142 * MongoDB 6.0+ allows returning the pre-image of the modified document 143 * if the collection has changeStreamPreAndPostImages enabled. Specify 144 * "whenAvailable" to return the pre-image if available or a null value 145 * if not. Specify "required" to return the pre-image if available or 146 * raise an error if not. 147 * 148 * * maxAwaitTimeMS (integer): The maximum amount of time for the server to 149 * wait on new documents to satisfy a change stream query. 150 * 151 * * readConcern (MongoDB\Driver\ReadConcern): Read concern. 152 * 153 * * readPreference (MongoDB\Driver\ReadPreference): Read preference. This 154 * will be used to select a new server when resuming. Defaults to a 155 * "primary" read preference. 156 * 157 * * resumeAfter (document): Specifies the logical starting point for the 158 * new change stream. 159 * 160 * Using this option in conjunction with "startAfter" and/or 161 * "startAtOperationTime" will result in a server error. The options are 162 * mutually exclusive. 163 * 164 * * session (MongoDB\Driver\Session): Client session. 165 * 166 * * showExpandedEvents (boolean): Enables the server to send the expanded 167 * list of change stream events. 168 * 169 * This option is not supported for server versions < 6.0. 170 * 171 * * startAfter (document): Specifies the logical starting point for the 172 * new change stream. Unlike "resumeAfter", this option can be used with 173 * a resume token from an "invalidate" event. 174 * 175 * Using this option in conjunction with "resumeAfter" and/or 176 * "startAtOperationTime" will result in a server error. The options are 177 * mutually exclusive. 178 * 179 * * startAtOperationTime (MongoDB\BSON\TimestampInterface): If specified, 180 * the change stream will only provide changes that occurred at or after 181 * the specified timestamp. Any command run against the server will 182 * return an operation time that can be used here. Alternatively, an 183 * operation time may be obtained from MongoDB\Driver\Server::getInfo(). 184 * 185 * Using this option in conjunction with "resumeAfter" and/or 186 * "startAfter" will result in a server error. The options are mutually 187 * exclusive. 188 * 189 * This option is not supported for server versions < 4.0. 190 * 191 * * typeMap (array): Type map for BSON deserialization. This will be 192 * applied to the returned Cursor (it is not sent to the server). 193 * 194 * Note: A database-level change stream may be created by specifying null 195 * for the collection name. A cluster-level change stream may be created by 196 * specifying null for both the database and collection name. 197 * 198 * @param Manager $manager Manager instance from the driver 199 * @param string|null $databaseName Database name 200 * @param string|null $collectionName Collection name 201 * @param array $pipeline List of pipeline operations 202 * @param array $options Command options 203 * @throws InvalidArgumentException for parameter/option parsing errors 204 */ 205 public function __construct(Manager $manager, ?string $databaseName, ?string $collectionName, array $pipeline, array $options = []) 206 { 207 if (isset($collectionName) && ! isset($databaseName)) { 208 throw new InvalidArgumentException('$collectionName should also be null if $databaseName is null'); 209 } 210 211 $options += [ 212 'readPreference' => new ReadPreference(ReadPreference::RP_PRIMARY), 213 ]; 214 215 if (array_key_exists('fullDocument', $options) && ! is_string($options['fullDocument'])) { 216 throw InvalidArgumentException::invalidType('"fullDocument" option', $options['fullDocument'], 'string'); 217 } 218 219 if (isset($options['fullDocumentBeforeChange']) && ! is_string($options['fullDocumentBeforeChange'])) { 220 throw InvalidArgumentException::invalidType('"fullDocumentBeforeChange" option', $options['fullDocumentBeforeChange'], 'string'); 221 } 222 223 if (! $options['readPreference'] instanceof ReadPreference) { 224 throw InvalidArgumentException::invalidType('"readPreference" option', $options['readPreference'], ReadPreference::class); 225 } 226 227 if (isset($options['resumeAfter']) && ! is_array($options['resumeAfter']) && ! is_object($options['resumeAfter'])) { 228 throw InvalidArgumentException::invalidType('"resumeAfter" option', $options['resumeAfter'], 'array or object'); 229 } 230 231 if (isset($options['startAfter']) && ! is_array($options['startAfter']) && ! is_object($options['startAfter'])) { 232 throw InvalidArgumentException::invalidType('"startAfter" option', $options['startAfter'], 'array or object'); 233 } 234 235 if (isset($options['startAtOperationTime']) && ! $options['startAtOperationTime'] instanceof TimestampInterface) { 236 throw InvalidArgumentException::invalidType('"startAtOperationTime" option', $options['startAtOperationTime'], TimestampInterface::class); 237 } 238 239 if (isset($options['showExpandedEvents']) && ! is_bool($options['showExpandedEvents'])) { 240 throw InvalidArgumentException::invalidType('"showExpandedEvents" option', $options['showExpandedEvents'], 'bool'); 241 } 242 243 /* In the absence of an explicit session, create one to ensure that the 244 * initial aggregation and any resume attempts can use the same session 245 * ("implicit from the user's perspective" per PHPLIB-342). Since this 246 * is filling in for an implicit session, we default "causalConsistency" 247 * to false. */ 248 if (! isset($options['session'])) { 249 try { 250 $options['session'] = $manager->startSession(['causalConsistency' => false]); 251 } catch (RuntimeException $e) { 252 /* We can ignore the exception, as libmongoc likely cannot 253 * create its own session and there is no risk of a mismatch. */ 254 } 255 } 256 257 $this->aggregateOptions = array_intersect_key($options, ['batchSize' => 1, 'collation' => 1, 'comment' => 1, 'maxAwaitTimeMS' => 1, 'readConcern' => 1, 'readPreference' => 1, 'session' => 1, 'typeMap' => 1]); 258 $this->changeStreamOptions = array_intersect_key($options, ['fullDocument' => 1, 'fullDocumentBeforeChange' => 1, 'resumeAfter' => 1, 'showExpandedEvents' => 1, 'startAfter' => 1, 'startAtOperationTime' => 1]); 259 260 // Null database name implies a cluster-wide change stream 261 if ($databaseName === null) { 262 $databaseName = 'admin'; 263 $this->changeStreamOptions['allChangesForCluster'] = true; 264 } 265 266 $this->manager = $manager; 267 $this->databaseName = $databaseName; 268 $this->collectionName = $collectionName; 269 $this->pipeline = $pipeline; 270 271 $this->aggregate = $this->createAggregate(); 272 } 273 274 /** @internal */ 275 final public function commandFailed(CommandFailedEvent $event): void 276 { 277 } 278 279 /** @internal */ 280 final public function commandStarted(CommandStartedEvent $event): void 281 { 282 if ($event->getCommandName() !== 'aggregate') { 283 return; 284 } 285 286 $this->firstBatchSize = 0; 287 $this->postBatchResumeToken = null; 288 } 289 290 /** @internal */ 291 final public function commandSucceeded(CommandSucceededEvent $event): void 292 { 293 if ($event->getCommandName() !== 'aggregate') { 294 return; 295 } 296 297 $reply = $event->getReply(); 298 299 if (! isset($reply->cursor->firstBatch) || ! is_array($reply->cursor->firstBatch)) { 300 throw new UnexpectedValueException('aggregate command did not return a "cursor.firstBatch" array'); 301 } 302 303 $this->firstBatchSize = count($reply->cursor->firstBatch); 304 305 if (isset($reply->cursor->postBatchResumeToken) && is_object($reply->cursor->postBatchResumeToken)) { 306 $this->postBatchResumeToken = $reply->cursor->postBatchResumeToken; 307 } 308 309 if ( 310 $this->shouldCaptureOperationTime($event->getServer()) && 311 isset($reply->operationTime) && $reply->operationTime instanceof TimestampInterface 312 ) { 313 $this->operationTime = $reply->operationTime; 314 } 315 } 316 317 /** 318 * Execute the operation. 319 * 320 * @see Executable::execute() 321 * @return ChangeStream 322 * @throws UnsupportedException if collation or read concern is used and unsupported 323 * @throws RuntimeException for other driver errors (e.g. connection errors) 324 */ 325 public function execute(Server $server) 326 { 327 return new ChangeStream( 328 $this->createChangeStreamIterator($server), 329 function ($resumeToken, $hasAdvanced): ChangeStreamIterator { 330 return $this->resume($resumeToken, $hasAdvanced); 331 } 332 ); 333 } 334 335 /** 336 * Create the aggregate command for a change stream. 337 * 338 * This method is also used to recreate the aggregate command when resuming. 339 */ 340 private function createAggregate(): Aggregate 341 { 342 $pipeline = $this->pipeline; 343 array_unshift($pipeline, ['$changeStream' => (object) $this->changeStreamOptions]); 344 345 return new Aggregate($this->databaseName, $this->collectionName, $pipeline, $this->aggregateOptions); 346 } 347 348 /** 349 * Create a ChangeStreamIterator by executing the aggregate command. 350 */ 351 private function createChangeStreamIterator(Server $server): ChangeStreamIterator 352 { 353 return new ChangeStreamIterator( 354 $this->executeAggregate($server), 355 $this->firstBatchSize, 356 $this->getInitialResumeToken(), 357 $this->postBatchResumeToken 358 ); 359 } 360 361 /** 362 * Execute the aggregate command. 363 * 364 * The command will be executed using APM so that we can capture data from 365 * its response (e.g. firstBatch size, postBatchResumeToken). 366 */ 367 private function executeAggregate(Server $server): Cursor 368 { 369 addSubscriber($this); 370 371 try { 372 $cursor = $this->aggregate->execute($server); 373 assert($cursor instanceof Cursor); 374 375 return $cursor; 376 } finally { 377 removeSubscriber($this); 378 } 379 } 380 381 /** 382 * Return the initial resume token for creating the ChangeStreamIterator. 383 * 384 * @see https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.rst#updating-the-cached-resume-token 385 * @return array|object|null 386 */ 387 private function getInitialResumeToken() 388 { 389 if ($this->firstBatchSize === 0 && isset($this->postBatchResumeToken)) { 390 return $this->postBatchResumeToken; 391 } 392 393 if (isset($this->changeStreamOptions['startAfter'])) { 394 return $this->changeStreamOptions['startAfter']; 395 } 396 397 if (isset($this->changeStreamOptions['resumeAfter'])) { 398 return $this->changeStreamOptions['resumeAfter']; 399 } 400 401 return null; 402 } 403 404 /** 405 * Resumes a change stream. 406 * 407 * @see https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.rst#resume-process 408 * @param array|object|null $resumeToken 409 * @throws InvalidArgumentException 410 */ 411 private function resume($resumeToken = null, bool $hasAdvanced = false): ChangeStreamIterator 412 { 413 if (isset($resumeToken) && ! is_array($resumeToken) && ! is_object($resumeToken)) { 414 throw InvalidArgumentException::invalidType('$resumeToken', $resumeToken, 'array or object'); 415 } 416 417 $this->hasResumed = true; 418 419 /* Select a new server using the original read preference. While watch 420 * is not usable within transactions, we still check if there is a 421 * pinned session. This is to avoid an ambiguous error message about 422 * running a command on the wrong server. */ 423 $server = select_server($this->manager, $this->aggregateOptions); 424 425 $resumeOption = isset($this->changeStreamOptions['startAfter']) && ! $hasAdvanced ? 'startAfter' : 'resumeAfter'; 426 427 unset($this->changeStreamOptions['resumeAfter']); 428 unset($this->changeStreamOptions['startAfter']); 429 unset($this->changeStreamOptions['startAtOperationTime']); 430 431 if ($resumeToken !== null) { 432 $this->changeStreamOptions[$resumeOption] = $resumeToken; 433 } 434 435 if ($resumeToken === null && $this->operationTime !== null) { 436 $this->changeStreamOptions['startAtOperationTime'] = $this->operationTime; 437 } 438 439 // Recreate the aggregate command and return a new ChangeStreamIterator 440 $this->aggregate = $this->createAggregate(); 441 442 return $this->createChangeStreamIterator($server); 443 } 444 445 /** 446 * Determine whether to capture operation time from an aggregate response. 447 * 448 * @see https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.rst#startatoperationtime 449 */ 450 private function shouldCaptureOperationTime(Server $server): bool 451 { 452 if ($this->hasResumed) { 453 return false; 454 } 455 456 if ( 457 isset($this->changeStreamOptions['resumeAfter']) || 458 isset($this->changeStreamOptions['startAfter']) || 459 isset($this->changeStreamOptions['startAtOperationTime']) 460 ) { 461 return false; 462 } 463 464 if ($this->firstBatchSize > 0) { 465 return false; 466 } 467 468 if ($this->postBatchResumeToken !== null) { 469 return false; 470 } 471 472 if (! server_supports_feature($server, self::$wireVersionForStartAtOperationTime)) { 473 return false; 474 } 475 476 return true; 477 } 478 }
title
Description
Body
title
Description
Body
title
Description
Body
title
Body