Search moodle.org's
Developer Documentation

See Release Notes
Long Term Support Release

  • Bug fixes for general core bugs in 4.1.x will end 13 November 2023 (12 months).
  • Bug fixes for security issues in 4.1.x will end 10 November 2025 (36 months).
  • PHP version: minimum PHP 7.4.0 Note: minimum PHP version has increased since Moodle 4.0. PHP 8.0.x is supported too.

Differences Between: [Versions 310 and 401] [Versions 311 and 401] [Versions 39 and 401] [Versions 400 and 401]

   1  <?php
   2  /*
   3   * Copyright 2015-present MongoDB, Inc.
   4   *
   5   * Licensed under the Apache License, Version 2.0 (the "License");
   6   * you may not use this file except in compliance with the License.
   7   * You may obtain a copy of the License at
   8   *
   9   *   https://www.apache.org/licenses/LICENSE-2.0
  10   *
  11   * Unless required by applicable law or agreed to in writing, software
  12   * distributed under the License is distributed on an "AS IS" BASIS,
  13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14   * See the License for the specific language governing permissions and
  15   * limitations under the License.
  16   */
  17  
  18  namespace MongoDB\Operation;
  19  
  20  use ArrayIterator;
  21  use MongoDB\Driver\Command;
  22  use MongoDB\Driver\Cursor;
  23  use MongoDB\Driver\Exception\RuntimeException as DriverRuntimeException;
  24  use MongoDB\Driver\ReadConcern;
  25  use MongoDB\Driver\ReadPreference;
  26  use MongoDB\Driver\Server;
  27  use MongoDB\Driver\Session;
  28  use MongoDB\Driver\WriteConcern;
  29  use MongoDB\Exception\InvalidArgumentException;
  30  use MongoDB\Exception\UnexpectedValueException;
  31  use MongoDB\Exception\UnsupportedException;
  32  use stdClass;
  33  
  34  use function current;
  35  use function is_array;
  36  use function is_bool;
  37  use function is_integer;
  38  use function is_object;
  39  use function is_string;
  40  use function MongoDB\create_field_path_type_map;
  41  use function MongoDB\is_last_pipeline_operator_write;
  42  use function sprintf;
  43  
  44  /**
  45   * Operation for the aggregate command.
  46   *
  47   * @api
  48   * @see \MongoDB\Collection::aggregate()
  49   * @see https://mongodb.com/docs/manual/reference/command/aggregate/
  50   */
  51  class Aggregate implements Executable, Explainable
  52  {
  53      /** @var string */
  54      private $databaseName;
  55  
  56      /** @var string|null */
  57      private $collectionName;
  58  
  59      /** @var array */
  60      private $pipeline;
  61  
  62      /** @var array */
  63      private $options;
  64  
  65      /** @var bool */
  66      private $isExplain;
  67  
  68      /** @var bool */
  69      private $isWrite;
  70  
  71      /**
  72       * Constructs an aggregate command.
  73       *
  74       * Supported options:
  75       *
  76       *  * allowDiskUse (boolean): Enables writing to temporary files. When set
  77       *    to true, aggregation stages can write data to the _tmp sub-directory
  78       *    in the dbPath directory.
  79       *
  80       *  * batchSize (integer): The number of documents to return per batch.
  81       *
  82       *  * bypassDocumentValidation (boolean): If true, allows the write to
  83       *    circumvent document level validation. This only applies when an $out
  84       *    or $merge stage is specified.
  85       *
  86       *  * collation (document): Collation specification.
  87       *
  88       *  * comment (mixed): BSON value to attach as a comment to this command.
  89       *
  90       *    Only string values are supported for server versions < 4.4.
  91       *
  92       *  * explain (boolean): Specifies whether or not to return the information
  93       *    on the processing of the pipeline.
  94       *
  95       *  * hint (string|document): The index to use. Specify either the index
  96       *    name as a string or the index key pattern as a document. If specified,
  97       *    then the query system will only consider plans using the hinted index.
  98       *
  99       *  * let (document): Map of parameter names and values. Values must be
 100       *    constant or closed expressions that do not reference document fields.
 101       *    Parameters can then be accessed as variables in an aggregate
 102       *    expression context (e.g. "$$var").
 103       *
 104       *    This is not supported for server versions < 5.0 and will result in an
 105       *    exception at execution time if used.
 106       *
 107       *  * maxTimeMS (integer): The maximum amount of time to allow the query to
 108       *    run.
 109       *
 110       *  * readConcern (MongoDB\Driver\ReadConcern): Read concern.
 111       *
 112       *  * readPreference (MongoDB\Driver\ReadPreference): Read preference.
 113       *
 114       *    This option is ignored if an $out or $merge stage is specified.
 115       *
 116       *  * session (MongoDB\Driver\Session): Client session.
 117       *
 118       *  * typeMap (array): Type map for BSON deserialization. This will be
 119       *    applied to the returned Cursor (it is not sent to the server).
 120       *
 121       *  * useCursor (boolean): Indicates whether the command will request that
 122       *    the server provide results using a cursor. The default is true.
 123       *
 124       *    This option allows users to turn off cursors if necessary to aid in
 125       *    mongod/mongos upgrades.
 126       *
 127       *  * writeConcern (MongoDB\Driver\WriteConcern): Write concern. This only
 128       *    applies when an $out or $merge stage is specified.
 129       *
 130       * Note: Collection-agnostic commands (e.g. $currentOp) may be executed by
 131       * specifying null for the collection name.
 132       *
 133       * @param string      $databaseName   Database name
 134       * @param string|null $collectionName Collection name
 135       * @param array       $pipeline       List of pipeline operations
 136       * @param array       $options        Command options
 137       * @throws InvalidArgumentException for parameter/option parsing errors
 138       */
 139      public function __construct(string $databaseName, ?string $collectionName, array $pipeline, array $options = [])
 140      {
 141          $expectedIndex = 0;
 142  
 143          foreach ($pipeline as $i => $operation) {
 144              if ($i !== $expectedIndex) {
 145                  throw new InvalidArgumentException(sprintf('$pipeline is not a list (unexpected index: "%s")', $i));
 146              }
 147  
 148              if (! is_array($operation) && ! is_object($operation)) {
 149                  throw InvalidArgumentException::invalidType(sprintf('$pipeline[%d]', $i), $operation, 'array or object');
 150              }
 151  
 152              $expectedIndex += 1;
 153          }
 154  
 155          $options += ['useCursor' => true];
 156  
 157          if (isset($options['allowDiskUse']) && ! is_bool($options['allowDiskUse'])) {
 158              throw InvalidArgumentException::invalidType('"allowDiskUse" option', $options['allowDiskUse'], 'boolean');
 159          }
 160  
 161          if (isset($options['batchSize']) && ! is_integer($options['batchSize'])) {
 162              throw InvalidArgumentException::invalidType('"batchSize" option', $options['batchSize'], 'integer');
 163          }
 164  
 165          if (isset($options['bypassDocumentValidation']) && ! is_bool($options['bypassDocumentValidation'])) {
 166              throw InvalidArgumentException::invalidType('"bypassDocumentValidation" option', $options['bypassDocumentValidation'], 'boolean');
 167          }
 168  
 169          if (isset($options['collation']) && ! is_array($options['collation']) && ! is_object($options['collation'])) {
 170              throw InvalidArgumentException::invalidType('"collation" option', $options['collation'], 'array or object');
 171          }
 172  
 173          if (isset($options['explain']) && ! is_bool($options['explain'])) {
 174              throw InvalidArgumentException::invalidType('"explain" option', $options['explain'], 'boolean');
 175          }
 176  
 177          if (isset($options['hint']) && ! is_string($options['hint']) && ! is_array($options['hint']) && ! is_object($options['hint'])) {
 178              throw InvalidArgumentException::invalidType('"hint" option', $options['hint'], 'string or array or object');
 179          }
 180  
 181          if (isset($options['let']) && ! is_array($options['let']) && ! is_object($options['let'])) {
 182              throw InvalidArgumentException::invalidType('"let" option', $options['let'], ['array', 'object']);
 183          }
 184  
 185          if (isset($options['maxAwaitTimeMS']) && ! is_integer($options['maxAwaitTimeMS'])) {
 186              throw InvalidArgumentException::invalidType('"maxAwaitTimeMS" option', $options['maxAwaitTimeMS'], 'integer');
 187          }
 188  
 189          if (isset($options['maxTimeMS']) && ! is_integer($options['maxTimeMS'])) {
 190              throw InvalidArgumentException::invalidType('"maxTimeMS" option', $options['maxTimeMS'], 'integer');
 191          }
 192  
 193          if (isset($options['readConcern']) && ! $options['readConcern'] instanceof ReadConcern) {
 194              throw InvalidArgumentException::invalidType('"readConcern" option', $options['readConcern'], ReadConcern::class);
 195          }
 196  
 197          if (isset($options['readPreference']) && ! $options['readPreference'] instanceof ReadPreference) {
 198              throw InvalidArgumentException::invalidType('"readPreference" option', $options['readPreference'], ReadPreference::class);
 199          }
 200  
 201          if (isset($options['session']) && ! $options['session'] instanceof Session) {
 202              throw InvalidArgumentException::invalidType('"session" option', $options['session'], Session::class);
 203          }
 204  
 205          if (isset($options['typeMap']) && ! is_array($options['typeMap'])) {
 206              throw InvalidArgumentException::invalidType('"typeMap" option', $options['typeMap'], 'array');
 207          }
 208  
 209          if (! is_bool($options['useCursor'])) {
 210              throw InvalidArgumentException::invalidType('"useCursor" option', $options['useCursor'], 'boolean');
 211          }
 212  
 213          if (isset($options['writeConcern']) && ! $options['writeConcern'] instanceof WriteConcern) {
 214              throw InvalidArgumentException::invalidType('"writeConcern" option', $options['writeConcern'], WriteConcern::class);
 215          }
 216  
 217          if (isset($options['batchSize']) && ! $options['useCursor']) {
 218              throw new InvalidArgumentException('"batchSize" option should not be used if "useCursor" is false');
 219          }
 220  
 221          if (isset($options['bypassDocumentValidation']) && ! $options['bypassDocumentValidation']) {
 222              unset($options['bypassDocumentValidation']);
 223          }
 224  
 225          if (isset($options['readConcern']) && $options['readConcern']->isDefault()) {
 226              unset($options['readConcern']);
 227          }
 228  
 229          if (isset($options['writeConcern']) && $options['writeConcern']->isDefault()) {
 230              unset($options['writeConcern']);
 231          }
 232  
 233          $this->isExplain = ! empty($options['explain']);
 234          $this->isWrite = is_last_pipeline_operator_write($pipeline) && ! $this->isExplain;
 235  
 236          // Explain does not use a cursor
 237          if ($this->isExplain) {
 238              $options['useCursor'] = false;
 239              unset($options['batchSize']);
 240          }
 241  
 242          /* Ignore batchSize for writes, since no documents are returned and a
 243           * batchSize of zero could prevent the pipeline from executing. */
 244          if ($this->isWrite) {
 245              unset($options['batchSize']);
 246          }
 247  
 248          $this->databaseName = $databaseName;
 249          $this->collectionName = $collectionName;
 250          $this->pipeline = $pipeline;
 251          $this->options = $options;
 252      }
 253  
 254      /**
 255       * Execute the operation.
 256       *
 257       * @see Executable::execute()
 258       * @return ArrayIterator|Cursor
 259       * @throws UnexpectedValueException if the command response was malformed
 260       * @throws UnsupportedException if read concern or write concern is used and unsupported
 261       * @throws DriverRuntimeException for other driver errors (e.g. connection errors)
 262       */
 263      public function execute(Server $server)
 264      {
 265          $inTransaction = isset($this->options['session']) && $this->options['session']->isInTransaction();
 266          if ($inTransaction) {
 267              if (isset($this->options['readConcern'])) {
 268                  throw UnsupportedException::readConcernNotSupportedInTransaction();
 269              }
 270  
 271              if (isset($this->options['writeConcern'])) {
 272                  throw UnsupportedException::writeConcernNotSupportedInTransaction();
 273              }
 274          }
 275  
 276          $command = new Command(
 277              $this->createCommandDocument(),
 278              $this->createCommandOptions()
 279          );
 280  
 281          $cursor = $this->executeCommand($server, $command);
 282  
 283          if ($this->options['useCursor'] || $this->isExplain) {
 284              if (isset($this->options['typeMap'])) {
 285                  $cursor->setTypeMap($this->options['typeMap']);
 286              }
 287  
 288              return $cursor;
 289          }
 290  
 291          if (isset($this->options['typeMap'])) {
 292              $cursor->setTypeMap(create_field_path_type_map($this->options['typeMap'], 'result.$'));
 293          }
 294  
 295          $result = current($cursor->toArray());
 296  
 297          if (! is_object($result) || ! isset($result->result) || ! is_array($result->result)) {
 298              throw new UnexpectedValueException('aggregate command did not return a "result" array');
 299          }
 300  
 301          return new ArrayIterator($result->result);
 302      }
 303  
 304      /**
 305       * Returns the command document for this operation.
 306       *
 307       * @see Explainable::getCommandDocument()
 308       * @return array
 309       */
 310      public function getCommandDocument(Server $server)
 311      {
 312          return $this->createCommandDocument();
 313      }
 314  
 315      /**
 316       * Create the aggregate command document.
 317       */
 318      private function createCommandDocument(): array
 319      {
 320          $cmd = [
 321              'aggregate' => $this->collectionName ?? 1,
 322              'pipeline' => $this->pipeline,
 323          ];
 324  
 325          foreach (['allowDiskUse', 'bypassDocumentValidation', 'comment', 'explain', 'maxTimeMS'] as $option) {
 326              if (isset($this->options[$option])) {
 327                  $cmd[$option] = $this->options[$option];
 328              }
 329          }
 330  
 331          foreach (['collation', 'let'] as $option) {
 332              if (isset($this->options[$option])) {
 333                  $cmd[$option] = (object) $this->options[$option];
 334              }
 335          }
 336  
 337          if (isset($this->options['hint'])) {
 338              $cmd['hint'] = is_array($this->options['hint']) ? (object) $this->options['hint'] : $this->options['hint'];
 339          }
 340  
 341          if ($this->options['useCursor']) {
 342              $cmd['cursor'] = isset($this->options["batchSize"])
 343                  ? ['batchSize' => $this->options["batchSize"]]
 344                  : new stdClass();
 345          }
 346  
 347          return $cmd;
 348      }
 349  
 350      private function createCommandOptions(): array
 351      {
 352          $cmdOptions = [];
 353  
 354          if (isset($this->options['maxAwaitTimeMS'])) {
 355              $cmdOptions['maxAwaitTimeMS'] = $this->options['maxAwaitTimeMS'];
 356          }
 357  
 358          return $cmdOptions;
 359      }
 360  
 361      /**
 362       * Execute the aggregate command using the appropriate Server method.
 363       *
 364       * @see https://php.net/manual/en/mongodb-driver-server.executecommand.php
 365       * @see https://php.net/manual/en/mongodb-driver-server.executereadcommand.php
 366       * @see https://php.net/manual/en/mongodb-driver-server.executereadwritecommand.php
 367       */
 368      private function executeCommand(Server $server, Command $command): Cursor
 369      {
 370          $options = [];
 371  
 372          foreach (['readConcern', 'readPreference', 'session'] as $option) {
 373              if (isset($this->options[$option])) {
 374                  $options[$option] = $this->options[$option];
 375              }
 376          }
 377  
 378          if ($this->isWrite && isset($this->options['writeConcern'])) {
 379              $options['writeConcern'] = $this->options['writeConcern'];
 380          }
 381  
 382          if (! $this->isWrite) {
 383              return $server->executeReadCommand($this->databaseName, $command, $options);
 384          }
 385  
 386          /* Server::executeReadWriteCommand() does not support a "readPreference"
 387           * option, so fall back to executeCommand(). This means that libmongoc
 388           * will not apply any client-level options (e.g. writeConcern), but that
 389           * should not be an issue as PHPLIB handles inheritance on its own. */
 390          if (isset($options['readPreference'])) {
 391              return $server->executeCommand($this->databaseName, $command, $options);
 392          }
 393  
 394          return $server->executeReadWriteCommand($this->databaseName, $command, $options);
 395      }
 396  }