Search moodle.org's
Developer Documentation

See Release Notes
Long Term Support Release

  • Bug fixes for general core bugs in 4.1.x will end 13 November 2023 (12 months).
  • Bug fixes for security issues in 4.1.x will end 10 November 2025 (36 months).
  • PHP version: minimum PHP 7.4.0 Note: minimum PHP version has increased since Moodle 4.0. PHP 8.0.x is supported too.

Differences Between: [Versions 310 and 401] [Versions 311 and 401] [Versions 39 and 401] [Versions 400 and 401]

   1  <?php
   2  /*
   3   * Copyright 2017-present MongoDB, Inc.
   4   *
   5   * Licensed under the Apache License, Version 2.0 (the "License");
   6   * you may not use this file except in compliance with the License.
   7   * You may obtain a copy of the License at
   8   *
   9   *   https://www.apache.org/licenses/LICENSE-2.0
  10   *
  11   * Unless required by applicable law or agreed to in writing, software
  12   * distributed under the License is distributed on an "AS IS" BASIS,
  13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14   * See the License for the specific language governing permissions and
  15   * limitations under the License.
  16   */
  17  
  18  namespace MongoDB\Operation;
  19  
  20  use MongoDB\BSON\TimestampInterface;
  21  use MongoDB\ChangeStream;
  22  use MongoDB\Driver\Cursor;
  23  use MongoDB\Driver\Exception\RuntimeException;
  24  use MongoDB\Driver\Manager;
  25  use MongoDB\Driver\Monitoring\CommandFailedEvent;
  26  use MongoDB\Driver\Monitoring\CommandStartedEvent;
  27  use MongoDB\Driver\Monitoring\CommandSubscriber;
  28  use MongoDB\Driver\Monitoring\CommandSucceededEvent;
  29  use MongoDB\Driver\ReadPreference;
  30  use MongoDB\Driver\Server;
  31  use MongoDB\Exception\InvalidArgumentException;
  32  use MongoDB\Exception\UnexpectedValueException;
  33  use MongoDB\Exception\UnsupportedException;
  34  use MongoDB\Model\ChangeStreamIterator;
  35  
  36  use function array_intersect_key;
  37  use function array_key_exists;
  38  use function array_unshift;
  39  use function assert;
  40  use function count;
  41  use function is_array;
  42  use function is_bool;
  43  use function is_object;
  44  use function is_string;
  45  use function MongoDB\Driver\Monitoring\addSubscriber;
  46  use function MongoDB\Driver\Monitoring\removeSubscriber;
  47  use function MongoDB\select_server;
  48  use function MongoDB\server_supports_feature;
  49  
  50  /**
  51   * Operation for creating a change stream with the aggregate command.
  52   *
  53   * Note: the implementation of CommandSubscriber is an internal implementation
  54   * detail and should not be considered part of the public API.
  55   *
  56   * @api
  57   * @see \MongoDB\Collection::watch()
  58   * @see https://mongodb.com/docs/manual/changeStreams/
  59   */
  60  class Watch implements Executable, /* @internal */ CommandSubscriber
  61  {
  62      public const FULL_DOCUMENT_DEFAULT = 'default';
  63      public const FULL_DOCUMENT_UPDATE_LOOKUP = 'updateLookup';
  64      public const FULL_DOCUMENT_WHEN_AVAILABLE = 'whenAvailable';
  65      public const FULL_DOCUMENT_REQUIRED = 'required';
  66  
  67      public const FULL_DOCUMENT_BEFORE_CHANGE_OFF = 'off';
  68      public const FULL_DOCUMENT_BEFORE_CHANGE_WHEN_AVAILABLE = 'whenAvailable';
  69      public const FULL_DOCUMENT_BEFORE_CHANGE_REQUIRED = 'required';
  70  
  71      /** @var integer */
  72      private static $wireVersionForStartAtOperationTime = 7;
  73  
  74      /** @var Aggregate */
  75      private $aggregate;
  76  
  77      /** @var array */
  78      private $aggregateOptions;
  79  
  80      /** @var array */
  81      private $changeStreamOptions;
  82  
  83      /** @var string|null */
  84      private $collectionName;
  85  
  86      /** @var string */
  87      private $databaseName;
  88  
  89      /** @var integer */
  90      private $firstBatchSize;
  91  
  92      /** @var boolean */
  93      private $hasResumed = false;
  94  
  95      /** @var Manager */
  96      private $manager;
  97  
  98      /** @var TimestampInterface */
  99      private $operationTime;
 100  
 101      /** @var array */
 102      private $pipeline;
 103  
 104      /** @var object|null */
 105      private $postBatchResumeToken;
 106  
 107      /**
 108       * Constructs an aggregate command for creating a change stream.
 109       *
 110       * Supported options:
 111       *
 112       *  * batchSize (integer): The number of documents to return per batch.
 113       *
 114       *  * collation (document): Specifies a collation.
 115       *
 116       *  * comment (mixed): BSON value to attach as a comment to this command.
 117       *
 118       *    Only string values are supported for server versions < 4.4.
 119       *
 120       *  * fullDocument (string): Determines how the "fullDocument" response
 121       *    field will be populated for update operations.
 122       *
 123       *    By default, change streams only return the delta of fields (via an
 124       *    "updateDescription" field) for update operations and "fullDocument" is
 125       *    omitted. Insert and replace operations always include the
 126       *    "fullDocument" field. Delete operations omit the field as the document
 127       *    no longer exists.
 128       *
 129       *    Specify "updateLookup" to return the current majority-committed
 130       *    version of the updated document.
 131       *
 132       *    MongoDB 6.0+ allows returning the post-image of the modified document
 133       *    if the collection has changeStreamPreAndPostImages enabled. Specify
 134       *    "whenAvailable" to return the post-image if available or a null value
 135       *    if not. Specify "required" to return the post-image if available or
 136       *    raise an error if not.
 137       *
 138       *  * fullDocumentBeforeChange (string): Determines how the
 139       *    "fullDocumentBeforeChange" response field will be populated. By
 140       *    default, the field is omitted.
 141       *
 142       *    MongoDB 6.0+ allows returning the pre-image of the modified document
 143       *    if the collection has changeStreamPreAndPostImages enabled. Specify
 144       *    "whenAvailable" to return the pre-image if available or a null value
 145       *    if not. Specify "required" to return the pre-image if available or
 146       *    raise an error if not.
 147       *
 148       *  * maxAwaitTimeMS (integer): The maximum amount of time for the server to
 149       *    wait on new documents to satisfy a change stream query.
 150       *
 151       *  * readConcern (MongoDB\Driver\ReadConcern): Read concern.
 152       *
 153       *  * readPreference (MongoDB\Driver\ReadPreference): Read preference. This
 154       *    will be used to select a new server when resuming. Defaults to a
 155       *    "primary" read preference.
 156       *
 157       *  * resumeAfter (document): Specifies the logical starting point for the
 158       *    new change stream.
 159       *
 160       *    Using this option in conjunction with "startAfter" and/or
 161       *    "startAtOperationTime" will result in a server error. The options are
 162       *    mutually exclusive.
 163       *
 164       *  * session (MongoDB\Driver\Session): Client session.
 165       *
 166       *  * showExpandedEvents (boolean): Enables the server to send the expanded
 167       *    list of change stream events.
 168       *
 169       *    This option is not supported for server versions < 6.0.
 170       *
 171       *  * startAfter (document): Specifies the logical starting point for the
 172       *    new change stream. Unlike "resumeAfter", this option can be used with
 173       *    a resume token from an "invalidate" event.
 174       *
 175       *    Using this option in conjunction with "resumeAfter" and/or
 176       *    "startAtOperationTime" will result in a server error. The options are
 177       *    mutually exclusive.
 178       *
 179       *  * startAtOperationTime (MongoDB\BSON\TimestampInterface): If specified,
 180       *    the change stream will only provide changes that occurred at or after
 181       *    the specified timestamp. Any command run against the server will
 182       *    return an operation time that can be used here. Alternatively, an
 183       *    operation time may be obtained from MongoDB\Driver\Server::getInfo().
 184       *
 185       *    Using this option in conjunction with "resumeAfter" and/or
 186       *    "startAfter" will result in a server error. The options are mutually
 187       *    exclusive.
 188       *
 189       *    This option is not supported for server versions < 4.0.
 190       *
 191       *  * typeMap (array): Type map for BSON deserialization. This will be
 192       *    applied to the returned Cursor (it is not sent to the server).
 193       *
 194       * Note: A database-level change stream may be created by specifying null
 195       * for the collection name. A cluster-level change stream may be created by
 196       * specifying null for both the database and collection name.
 197       *
 198       * @param Manager     $manager        Manager instance from the driver
 199       * @param string|null $databaseName   Database name
 200       * @param string|null $collectionName Collection name
 201       * @param array       $pipeline       List of pipeline operations
 202       * @param array       $options        Command options
 203       * @throws InvalidArgumentException for parameter/option parsing errors
 204       */
 205      public function __construct(Manager $manager, ?string $databaseName, ?string $collectionName, array $pipeline, array $options = [])
 206      {
 207          if (isset($collectionName) && ! isset($databaseName)) {
 208              throw new InvalidArgumentException('$collectionName should also be null if $databaseName is null');
 209          }
 210  
 211          $options += [
 212              'readPreference' => new ReadPreference(ReadPreference::RP_PRIMARY),
 213          ];
 214  
 215          if (array_key_exists('fullDocument', $options) && ! is_string($options['fullDocument'])) {
 216              throw InvalidArgumentException::invalidType('"fullDocument" option', $options['fullDocument'], 'string');
 217          }
 218  
 219          if (isset($options['fullDocumentBeforeChange']) && ! is_string($options['fullDocumentBeforeChange'])) {
 220              throw InvalidArgumentException::invalidType('"fullDocumentBeforeChange" option', $options['fullDocumentBeforeChange'], 'string');
 221          }
 222  
 223          if (! $options['readPreference'] instanceof ReadPreference) {
 224              throw InvalidArgumentException::invalidType('"readPreference" option', $options['readPreference'], ReadPreference::class);
 225          }
 226  
 227          if (isset($options['resumeAfter']) && ! is_array($options['resumeAfter']) && ! is_object($options['resumeAfter'])) {
 228              throw InvalidArgumentException::invalidType('"resumeAfter" option', $options['resumeAfter'], 'array or object');
 229          }
 230  
 231          if (isset($options['startAfter']) && ! is_array($options['startAfter']) && ! is_object($options['startAfter'])) {
 232              throw InvalidArgumentException::invalidType('"startAfter" option', $options['startAfter'], 'array or object');
 233          }
 234  
 235          if (isset($options['startAtOperationTime']) && ! $options['startAtOperationTime'] instanceof TimestampInterface) {
 236              throw InvalidArgumentException::invalidType('"startAtOperationTime" option', $options['startAtOperationTime'], TimestampInterface::class);
 237          }
 238  
 239          if (isset($options['showExpandedEvents']) && ! is_bool($options['showExpandedEvents'])) {
 240              throw InvalidArgumentException::invalidType('"showExpandedEvents" option', $options['showExpandedEvents'], 'bool');
 241          }
 242  
 243          /* In the absence of an explicit session, create one to ensure that the
 244           * initial aggregation and any resume attempts can use the same session
 245           * ("implicit from the user's perspective" per PHPLIB-342). Since this
 246           * is filling in for an implicit session, we default "causalConsistency"
 247           * to false. */
 248          if (! isset($options['session'])) {
 249              try {
 250                  $options['session'] = $manager->startSession(['causalConsistency' => false]);
 251              } catch (RuntimeException $e) {
 252                  /* We can ignore the exception, as libmongoc likely cannot
 253                   * create its own session and there is no risk of a mismatch. */
 254              }
 255          }
 256  
 257          $this->aggregateOptions = array_intersect_key($options, ['batchSize' => 1, 'collation' => 1, 'comment' => 1, 'maxAwaitTimeMS' => 1, 'readConcern' => 1, 'readPreference' => 1, 'session' => 1, 'typeMap' => 1]);
 258          $this->changeStreamOptions = array_intersect_key($options, ['fullDocument' => 1, 'fullDocumentBeforeChange' => 1, 'resumeAfter' => 1, 'showExpandedEvents' => 1, 'startAfter' => 1, 'startAtOperationTime' => 1]);
 259  
 260          // Null database name implies a cluster-wide change stream
 261          if ($databaseName === null) {
 262              $databaseName = 'admin';
 263              $this->changeStreamOptions['allChangesForCluster'] = true;
 264          }
 265  
 266          $this->manager = $manager;
 267          $this->databaseName = $databaseName;
 268          $this->collectionName = $collectionName;
 269          $this->pipeline = $pipeline;
 270  
 271          $this->aggregate = $this->createAggregate();
 272      }
 273  
 274      /** @internal */
 275      final public function commandFailed(CommandFailedEvent $event): void
 276      {
 277      }
 278  
 279      /** @internal */
 280      final public function commandStarted(CommandStartedEvent $event): void
 281      {
 282          if ($event->getCommandName() !== 'aggregate') {
 283              return;
 284          }
 285  
 286          $this->firstBatchSize = 0;
 287          $this->postBatchResumeToken = null;
 288      }
 289  
 290      /** @internal */
 291      final public function commandSucceeded(CommandSucceededEvent $event): void
 292      {
 293          if ($event->getCommandName() !== 'aggregate') {
 294              return;
 295          }
 296  
 297          $reply = $event->getReply();
 298  
 299          if (! isset($reply->cursor->firstBatch) || ! is_array($reply->cursor->firstBatch)) {
 300              throw new UnexpectedValueException('aggregate command did not return a "cursor.firstBatch" array');
 301          }
 302  
 303          $this->firstBatchSize = count($reply->cursor->firstBatch);
 304  
 305          if (isset($reply->cursor->postBatchResumeToken) && is_object($reply->cursor->postBatchResumeToken)) {
 306              $this->postBatchResumeToken = $reply->cursor->postBatchResumeToken;
 307          }
 308  
 309          if (
 310              $this->shouldCaptureOperationTime($event->getServer()) &&
 311              isset($reply->operationTime) && $reply->operationTime instanceof TimestampInterface
 312          ) {
 313              $this->operationTime = $reply->operationTime;
 314          }
 315      }
 316  
 317      /**
 318       * Execute the operation.
 319       *
 320       * @see Executable::execute()
 321       * @return ChangeStream
 322       * @throws UnsupportedException if collation or read concern is used and unsupported
 323       * @throws RuntimeException for other driver errors (e.g. connection errors)
 324       */
 325      public function execute(Server $server)
 326      {
 327          return new ChangeStream(
 328              $this->createChangeStreamIterator($server),
 329              function ($resumeToken, $hasAdvanced): ChangeStreamIterator {
 330                  return $this->resume($resumeToken, $hasAdvanced);
 331              }
 332          );
 333      }
 334  
 335      /**
 336       * Create the aggregate command for a change stream.
 337       *
 338       * This method is also used to recreate the aggregate command when resuming.
 339       */
 340      private function createAggregate(): Aggregate
 341      {
 342          $pipeline = $this->pipeline;
 343          array_unshift($pipeline, ['$changeStream' => (object) $this->changeStreamOptions]);
 344  
 345          return new Aggregate($this->databaseName, $this->collectionName, $pipeline, $this->aggregateOptions);
 346      }
 347  
 348      /**
 349       * Create a ChangeStreamIterator by executing the aggregate command.
 350       */
 351      private function createChangeStreamIterator(Server $server): ChangeStreamIterator
 352      {
 353          return new ChangeStreamIterator(
 354              $this->executeAggregate($server),
 355              $this->firstBatchSize,
 356              $this->getInitialResumeToken(),
 357              $this->postBatchResumeToken
 358          );
 359      }
 360  
 361      /**
 362       * Execute the aggregate command.
 363       *
 364       * The command will be executed using APM so that we can capture data from
 365       * its response (e.g. firstBatch size, postBatchResumeToken).
 366       */
 367      private function executeAggregate(Server $server): Cursor
 368      {
 369          addSubscriber($this);
 370  
 371          try {
 372              $cursor = $this->aggregate->execute($server);
 373              assert($cursor instanceof Cursor);
 374  
 375              return $cursor;
 376          } finally {
 377              removeSubscriber($this);
 378          }
 379      }
 380  
 381      /**
 382       * Return the initial resume token for creating the ChangeStreamIterator.
 383       *
 384       * @see https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.rst#updating-the-cached-resume-token
 385       * @return array|object|null
 386       */
 387      private function getInitialResumeToken()
 388      {
 389          if ($this->firstBatchSize === 0 && isset($this->postBatchResumeToken)) {
 390              return $this->postBatchResumeToken;
 391          }
 392  
 393          if (isset($this->changeStreamOptions['startAfter'])) {
 394              return $this->changeStreamOptions['startAfter'];
 395          }
 396  
 397          if (isset($this->changeStreamOptions['resumeAfter'])) {
 398              return $this->changeStreamOptions['resumeAfter'];
 399          }
 400  
 401          return null;
 402      }
 403  
 404      /**
 405       * Resumes a change stream.
 406       *
 407       * @see https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.rst#resume-process
 408       * @param array|object|null $resumeToken
 409       * @throws InvalidArgumentException
 410       */
 411      private function resume($resumeToken = null, bool $hasAdvanced = false): ChangeStreamIterator
 412      {
 413          if (isset($resumeToken) && ! is_array($resumeToken) && ! is_object($resumeToken)) {
 414              throw InvalidArgumentException::invalidType('$resumeToken', $resumeToken, 'array or object');
 415          }
 416  
 417          $this->hasResumed = true;
 418  
 419          /* Select a new server using the original read preference. While watch
 420           * is not usable within transactions, we still check if there is a
 421           * pinned session. This is to avoid an ambiguous error message about
 422           * running a command on the wrong server. */
 423          $server = select_server($this->manager, $this->aggregateOptions);
 424  
 425          $resumeOption = isset($this->changeStreamOptions['startAfter']) && ! $hasAdvanced ? 'startAfter' : 'resumeAfter';
 426  
 427          unset($this->changeStreamOptions['resumeAfter']);
 428          unset($this->changeStreamOptions['startAfter']);
 429          unset($this->changeStreamOptions['startAtOperationTime']);
 430  
 431          if ($resumeToken !== null) {
 432              $this->changeStreamOptions[$resumeOption] = $resumeToken;
 433          }
 434  
 435          if ($resumeToken === null && $this->operationTime !== null) {
 436              $this->changeStreamOptions['startAtOperationTime'] = $this->operationTime;
 437          }
 438  
 439          // Recreate the aggregate command and return a new ChangeStreamIterator
 440          $this->aggregate = $this->createAggregate();
 441  
 442          return $this->createChangeStreamIterator($server);
 443      }
 444  
 445      /**
 446       * Determine whether to capture operation time from an aggregate response.
 447       *
 448       * @see https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.rst#startatoperationtime
 449       */
 450      private function shouldCaptureOperationTime(Server $server): bool
 451      {
 452          if ($this->hasResumed) {
 453              return false;
 454          }
 455  
 456          if (
 457              isset($this->changeStreamOptions['resumeAfter']) ||
 458              isset($this->changeStreamOptions['startAfter']) ||
 459              isset($this->changeStreamOptions['startAtOperationTime'])
 460          ) {
 461              return false;
 462          }
 463  
 464          if ($this->firstBatchSize > 0) {
 465              return false;
 466          }
 467  
 468          if ($this->postBatchResumeToken !== null) {
 469              return false;
 470          }
 471  
 472          if (! server_supports_feature($server, self::$wireVersionForStartAtOperationTime)) {
 473              return false;
 474          }
 475  
 476          return true;
 477      }
 478  }