Search moodle.org's
Developer Documentation

See Release Notes

  • Bug fixes for general core bugs in 4.0.x will end 8 May 2023 (12 months).
  • Bug fixes for security issues in 4.0.x will end 13 November 2023 (18 months).
  • PHP version: minimum PHP 7.3.0 Note: the minimum PHP version has increased since Moodle 3.10. PHP 7.4.x is also supported.

Differences Between: [Versions 310 and 400] [Versions 39 and 400] [Versions 400 and 401]

   1  <?php
   2  /*
   3   * Copyright 2017 MongoDB, Inc.
   4   *
   5   * Licensed under the Apache License, Version 2.0 (the "License");
   6   * you may not use this file except in compliance with the License.
   7   * You may obtain a copy of the License at
   8   *
   9   *   http://www.apache.org/licenses/LICENSE-2.0
  10   *
  11   * Unless required by applicable law or agreed to in writing, software
  12   * distributed under the License is distributed on an "AS IS" BASIS,
  13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14   * See the License for the specific language governing permissions and
  15   * limitations under the License.
  16   */
  17  
  18  namespace MongoDB\Operation;
  19  
  20  use MongoDB\BSON\TimestampInterface;
  21  use MongoDB\ChangeStream;
  22  use MongoDB\Driver\Cursor;
  23  use MongoDB\Driver\Exception\RuntimeException;
  24  use MongoDB\Driver\Manager;
  25  use MongoDB\Driver\Monitoring\CommandFailedEvent;
  26  use MongoDB\Driver\Monitoring\CommandStartedEvent;
  27  use MongoDB\Driver\Monitoring\CommandSubscriber;
  28  use MongoDB\Driver\Monitoring\CommandSucceededEvent;
  29  use MongoDB\Driver\ReadPreference;
  30  use MongoDB\Driver\Server;
  31  use MongoDB\Exception\InvalidArgumentException;
  32  use MongoDB\Exception\UnexpectedValueException;
  33  use MongoDB\Exception\UnsupportedException;
  34  use MongoDB\Model\ChangeStreamIterator;
  35  use function array_intersect_key;
  36  use function array_unshift;
  37  use function count;
  38  use function is_array;
  39  use function is_object;
  40  use function is_string;
  41  use function MongoDB\Driver\Monitoring\addSubscriber;
  42  use function MongoDB\Driver\Monitoring\removeSubscriber;
  43  use function MongoDB\select_server;
  44  use function MongoDB\server_supports_feature;
  45  
  46  /**
  47   * Operation for creating a change stream with the aggregate command.
  48   *
  49   * Note: the implementation of CommandSubscriber is an internal implementation
  50   * detail and should not be considered part of the public API.
  51   *
  52   * @api
  53   * @see \MongoDB\Collection::watch()
  54   * @see https://docs.mongodb.com/manual/changeStreams/
  55   */
  56  class Watch implements Executable, /* @internal */ CommandSubscriber
  57  {
  58      const FULL_DOCUMENT_DEFAULT = 'default';
  59      const FULL_DOCUMENT_UPDATE_LOOKUP = 'updateLookup';
  60  
  61      /** @var integer */
  62      private static $wireVersionForStartAtOperationTime = 7;
  63  
  64      /** @var Aggregate */
  65      private $aggregate;
  66  
  67      /** @var array */
  68      private $aggregateOptions;
  69  
  70      /** @var array */
  71      private $changeStreamOptions;
  72  
  73      /** @var string|null */
  74      private $collectionName;
  75  
  76      /** @var string */
  77      private $databaseName;
  78  
  79      /** @var integer|null */
  80      private $firstBatchSize;
  81  
  82      /** @var boolean */
  83      private $hasResumed = false;
  84  
  85      /** @var Manager */
  86      private $manager;
  87  
  88      /** @var TimestampInterface */
  89      private $operationTime;
  90  
  91      /** @var array */
  92      private $pipeline;
  93  
  94      /** @var object|null */
  95      private $postBatchResumeToken;
  96  
  97      /**
  98       * Constructs an aggregate command for creating a change stream.
  99       *
 100       * Supported options:
 101       *
 102       *  * batchSize (integer): The number of documents to return per batch.
 103       *
 104       *  * collation (document): Specifies a collation.
 105       *
 106       *  * fullDocument (string): Determines whether the "fullDocument" field
 107       *    will be populated for update operations. By default, change streams
 108       *    only return the delta of fields during the update operation (via the
 109       *    "updateDescription" field). To additionally return the most current
 110       *    majority-committed version of the updated document, specify
 111       *    "updateLookup" for this option. Defaults to "default".
 112       *
 113       *    Insert and replace operations always include the "fullDocument" field
 114       *    and delete operations omit the field as the document no longer exists.
 115       *
 116       *  * maxAwaitTimeMS (integer): The maximum amount of time for the server to
 117       *    wait on new documents to satisfy a change stream query.
 118       *
 119       *  * readConcern (MongoDB\Driver\ReadConcern): Read concern.
 120       *
 121       *  * readPreference (MongoDB\Driver\ReadPreference): Read preference. This
 122       *    will be used to select a new server when resuming. Defaults to a
 123       *    "primary" read preference.
 124       *
 125       *  * resumeAfter (document): Specifies the logical starting point for the
 126       *    new change stream.
 127       *
 128       *    Using this option in conjunction with "startAfter" and/or
 129       *    "startAtOperationTime" will result in a server error. The options are
 130       *    mutually exclusive.
 131       *
 132       *  * session (MongoDB\Driver\Session): Client session.
 133       *
 134       *    Sessions are not supported for server versions < 3.6.
 135       *
 136       *  * startAfter (document): Specifies the logical starting point for the
 137       *    new change stream. Unlike "resumeAfter", this option can be used with
 138       *    a resume token from an "invalidate" event.
 139       *
 140       *    Using this option in conjunction with "resumeAfter" and/or
 141       *    "startAtOperationTime" will result in a server error. The options are
 142       *    mutually exclusive.
 143       *
 144       *  * startAtOperationTime (MongoDB\BSON\TimestampInterface): If specified,
 145       *    the change stream will only provide changes that occurred at or after
 146       *    the specified timestamp. Any command run against the server will
 147       *    return an operation time that can be used here. Alternatively, an
 148       *    operation time may be obtained from MongoDB\Driver\Server::getInfo().
 149       *
 150       *    Using this option in conjunction with "resumeAfter" and/or
 151       *    "startAfter" will result in a server error. The options are mutually
 152       *    exclusive.
 153       *
 154       *    This option is not supported for server versions < 4.0.
 155       *
 156       *  * typeMap (array): Type map for BSON deserialization. This will be
 157       *    applied to the returned Cursor (it is not sent to the server).
 158       *
 159       * Note: A database-level change stream may be created by specifying null
 160       * for the collection name. A cluster-level change stream may be created by
 161       * specifying null for both the database and collection name.
 162       *
 163       * @param Manager     $manager        Manager instance from the driver
 164       * @param string|null $databaseName   Database name
 165       * @param string|null $collectionName Collection name
 166       * @param array       $pipeline       List of pipeline operations
 167       * @param array       $options        Command options
 168       * @throws InvalidArgumentException for parameter/option parsing errors
 169       */
 170      public function __construct(Manager $manager, $databaseName, $collectionName, array $pipeline, array $options = [])
 171      {
 172          if (isset($collectionName) && ! isset($databaseName)) {
 173              throw new InvalidArgumentException('$collectionName should also be null if $databaseName is null');
 174          }
 175  
 176          $options += [
 177              'fullDocument' => self::FULL_DOCUMENT_DEFAULT,
 178              'readPreference' => new ReadPreference(ReadPreference::RP_PRIMARY),
 179          ];
 180  
 181          if (! is_string($options['fullDocument'])) {
 182              throw InvalidArgumentException::invalidType('"fullDocument" option', $options['fullDocument'], 'string');
 183          }
 184  
 185          if (! $options['readPreference'] instanceof ReadPreference) {
 186              throw InvalidArgumentException::invalidType('"readPreference" option', $options['readPreference'], ReadPreference::class);
 187          }
 188  
 189          if (isset($options['resumeAfter']) && ! is_array($options['resumeAfter']) && ! is_object($options['resumeAfter'])) {
 190              throw InvalidArgumentException::invalidType('"resumeAfter" option', $options['resumeAfter'], 'array or object');
 191          }
 192  
 193          if (isset($options['startAfter']) && ! is_array($options['startAfter']) && ! is_object($options['startAfter'])) {
 194              throw InvalidArgumentException::invalidType('"startAfter" option', $options['startAfter'], 'array or object');
 195          }
 196  
 197          if (isset($options['startAtOperationTime']) && ! $options['startAtOperationTime'] instanceof TimestampInterface) {
 198              throw InvalidArgumentException::invalidType('"startAtOperationTime" option', $options['startAtOperationTime'], TimestampInterface::class);
 199          }
 200  
 201          /* In the absence of an explicit session, create one to ensure that the
 202           * initial aggregation and any resume attempts can use the same session
 203           * ("implicit from the user's perspective" per PHPLIB-342). Since this
 204           * is filling in for an implicit session, we default "causalConsistency"
 205           * to false. */
 206          if (! isset($options['session'])) {
 207              try {
 208                  $options['session'] = $manager->startSession(['causalConsistency' => false]);
 209              } catch (RuntimeException $e) {
 210                  /* We can ignore the exception, as libmongoc likely cannot
 211                   * create its own session and there is no risk of a mismatch. */
 212              }
 213          }
 214  
 215          $this->aggregateOptions = array_intersect_key($options, ['batchSize' => 1, 'collation' => 1, 'maxAwaitTimeMS' => 1, 'readConcern' => 1, 'readPreference' => 1, 'session' => 1, 'typeMap' => 1]);
 216          $this->changeStreamOptions = array_intersect_key($options, ['fullDocument' => 1, 'resumeAfter' => 1, 'startAfter' => 1, 'startAtOperationTime' => 1]);
 217  
 218          // Null database name implies a cluster-wide change stream
 219          if ($databaseName === null) {
 220              $databaseName = 'admin';
 221              $this->changeStreamOptions['allChangesForCluster'] = true;
 222          }
 223  
 224          $this->manager = $manager;
 225          $this->databaseName = (string) $databaseName;
 226          $this->collectionName = isset($collectionName) ? (string) $collectionName : null;
 227          $this->pipeline = $pipeline;
 228  
 229          $this->aggregate = $this->createAggregate();
 230      }
 231  
 232      /** @internal */
 233      final public function commandFailed(CommandFailedEvent $event)
 234      {
 235      }
 236  
 237      /** @internal */
 238      final public function commandStarted(CommandStartedEvent $event)
 239      {
 240          if ($event->getCommandName() !== 'aggregate') {
 241              return;
 242          }
 243  
 244          $this->firstBatchSize = null;
 245          $this->postBatchResumeToken = null;
 246      }
 247  
 248      /** @internal */
 249      final public function commandSucceeded(CommandSucceededEvent $event)
 250      {
 251          if ($event->getCommandName() !== 'aggregate') {
 252              return;
 253          }
 254  
 255          $reply = $event->getReply();
 256  
 257          if (! isset($reply->cursor->firstBatch) || ! is_array($reply->cursor->firstBatch)) {
 258              throw new UnexpectedValueException('aggregate command did not return a "cursor.firstBatch" array');
 259          }
 260  
 261          $this->firstBatchSize = count($reply->cursor->firstBatch);
 262  
 263          if (isset($reply->cursor->postBatchResumeToken) && is_object($reply->cursor->postBatchResumeToken)) {
 264              $this->postBatchResumeToken = $reply->cursor->postBatchResumeToken;
 265          }
 266  
 267          if ($this->shouldCaptureOperationTime($event->getServer()) &&
 268              isset($reply->operationTime) && $reply->operationTime instanceof TimestampInterface) {
 269              $this->operationTime = $reply->operationTime;
 270          }
 271      }
 272  
 273      /**
 274       * Execute the operation.
 275       *
 276       * @see Executable::execute()
 277       * @param Server $server
 278       * @return ChangeStream
 279       * @throws UnsupportedException if collation or read concern is used and unsupported
 280       * @throws RuntimeException for other driver errors (e.g. connection errors)
 281       */
 282      public function execute(Server $server)
 283      {
 284          return new ChangeStream(
 285              $this->createChangeStreamIterator($server),
 286              function ($resumeToken, $hasAdvanced) {
 287                  return $this->resume($resumeToken, $hasAdvanced);
 288              }
 289          );
 290      }
 291  
 292      /**
 293       * Create the aggregate command for a change stream.
 294       *
 295       * This method is also used to recreate the aggregate command when resuming.
 296       *
 297       * @return Aggregate
 298       */
 299      private function createAggregate()
 300      {
 301          $pipeline = $this->pipeline;
 302          array_unshift($pipeline, ['$changeStream' => (object) $this->changeStreamOptions]);
 303  
 304          return new Aggregate($this->databaseName, $this->collectionName, $pipeline, $this->aggregateOptions);
 305      }
 306  
 307      /**
 308       * Create a ChangeStreamIterator by executing the aggregate command.
 309       *
 310       * @param Server $server
 311       * @return ChangeStreamIterator
 312       */
 313      private function createChangeStreamIterator(Server $server)
 314      {
 315          return new ChangeStreamIterator(
 316              $this->executeAggregate($server),
 317              $this->firstBatchSize,
 318              $this->getInitialResumeToken(),
 319              $this->postBatchResumeToken
 320          );
 321      }
 322  
 323      /**
 324       * Execute the aggregate command.
 325       *
 326       * The command will be executed using APM so that we can capture data from
 327       * its response (e.g. firstBatch size, postBatchResumeToken).
 328       *
 329       * @param Server $server
 330       * @return Cursor
 331       */
 332      private function executeAggregate(Server $server)
 333      {
 334          addSubscriber($this);
 335  
 336          try {
 337              return $this->aggregate->execute($server);
 338          } finally {
 339              removeSubscriber($this);
 340          }
 341      }
 342  
 343      /**
 344       * Return the initial resume token for creating the ChangeStreamIterator.
 345       *
 346       * @see https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.rst#updating-the-cached-resume-token
 347       * @return array|object|null
 348       */
 349      private function getInitialResumeToken()
 350      {
 351          if ($this->firstBatchSize === 0 && isset($this->postBatchResumeToken)) {
 352              return $this->postBatchResumeToken;
 353          }
 354  
 355          if (isset($this->changeStreamOptions['startAfter'])) {
 356              return $this->changeStreamOptions['startAfter'];
 357          }
 358  
 359          if (isset($this->changeStreamOptions['resumeAfter'])) {
 360              return $this->changeStreamOptions['resumeAfter'];
 361          }
 362  
 363          return null;
 364      }
 365  
 366      /**
 367       * Resumes a change stream.
 368       *
 369       * @see https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.rst#resume-process
 370       * @param array|object|null $resumeToken
 371       * @param bool              $hasAdvanced
 372       * @return ChangeStreamIterator
 373       * @throws InvalidArgumentException
 374       */
 375      private function resume($resumeToken = null, $hasAdvanced = false)
 376      {
 377          if (isset($resumeToken) && ! is_array($resumeToken) && ! is_object($resumeToken)) {
 378              throw InvalidArgumentException::invalidType('$resumeToken', $resumeToken, 'array or object');
 379          }
 380  
 381          $this->hasResumed = true;
 382  
 383          /* Select a new server using the original read preference. While watch
 384           * is not usable within transactions, we still check if there is a
 385           * pinned session. This is to avoid an ambiguous error message about
 386           * running a command on the wrong server. */
 387          $server = select_server($this->manager, $this->aggregateOptions);
 388  
 389          $resumeOption = isset($this->changeStreamOptions['startAfter']) && ! $hasAdvanced ? 'startAfter' : 'resumeAfter';
 390  
 391          unset($this->changeStreamOptions['resumeAfter']);
 392          unset($this->changeStreamOptions['startAfter']);
 393          unset($this->changeStreamOptions['startAtOperationTime']);
 394  
 395          if ($resumeToken !== null) {
 396              $this->changeStreamOptions[$resumeOption] = $resumeToken;
 397          }
 398  
 399          if ($resumeToken === null && $this->operationTime !== null) {
 400              $this->changeStreamOptions['startAtOperationTime'] = $this->operationTime;
 401          }
 402  
 403          // Recreate the aggregate command and return a new ChangeStreamIterator
 404          $this->aggregate = $this->createAggregate();
 405  
 406          return $this->createChangeStreamIterator($server);
 407      }
 408  
 409      /**
 410       * Determine whether to capture operation time from an aggregate response.
 411       *
 412       * @see https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.rst#startatoperationtime
 413       * @param Server $server
 414       * @return boolean
 415       */
 416      private function shouldCaptureOperationTime(Server $server)
 417      {
 418          if ($this->hasResumed) {
 419              return false;
 420          }
 421  
 422          if (isset($this->changeStreamOptions['resumeAfter']) ||
 423              isset($this->changeStreamOptions['startAfter']) ||
 424              isset($this->changeStreamOptions['startAtOperationTime'])) {
 425              return false;
 426          }
 427  
 428          if ($this->firstBatchSize > 0) {
 429              return false;
 430          }
 431  
 432          if ($this->postBatchResumeToken !== null) {
 433              return false;
 434          }
 435  
 436          if (! server_supports_feature($server, self::$wireVersionForStartAtOperationTime)) {
 437              return false;
 438          }
 439  
 440          return true;
 441      }
 442  }