Search moodle.org's
Developer Documentation

See Release Notes

  • Bug fixes for general core bugs in 3.11.x will end 14 Nov 2022 (12 months plus 6 months extension).
  • Bug fixes for security issues in 3.11.x will end 13 Nov 2023 (18 months plus 12 months extension).
  • PHP version: minimum PHP 7.3.0 Note: minimum PHP version has increased since Moodle 3.10. PHP 7.4.x is supported too.

Differences Between: [Versions 310 and 311] [Versions 311 and 401] [Versions 39 and 311]

   1  <?php
   2  /*
   3   * Copyright 2015-2017 MongoDB, Inc.
   4   *
   5   * Licensed under the Apache License, Version 2.0 (the "License");
   6   * you may not use this file except in compliance with the License.
   7   * You may obtain a copy of the License at
   8   *
   9   *   http://www.apache.org/licenses/LICENSE-2.0
  10   *
  11   * Unless required by applicable law or agreed to in writing, software
  12   * distributed under the License is distributed on an "AS IS" BASIS,
  13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14   * See the License for the specific language governing permissions and
  15   * limitations under the License.
  16   */
  17  
  18  namespace MongoDB\Operation;
  19  
  20  use ArrayIterator;
  21  use MongoDB\Driver\Command;
  22  use MongoDB\Driver\Exception\RuntimeException as DriverRuntimeException;
  23  use MongoDB\Driver\ReadConcern;
  24  use MongoDB\Driver\ReadPreference;
  25  use MongoDB\Driver\Server;
  26  use MongoDB\Driver\Session;
  27  use MongoDB\Driver\WriteConcern;
  28  use MongoDB\Exception\InvalidArgumentException;
  29  use MongoDB\Exception\UnexpectedValueException;
  30  use MongoDB\Exception\UnsupportedException;
  31  use stdClass;
  32  use Traversable;
  33  use function current;
  34  use function is_array;
  35  use function is_bool;
  36  use function is_integer;
  37  use function is_object;
  38  use function is_string;
  39  use function MongoDB\create_field_path_type_map;
  40  use function MongoDB\is_last_pipeline_operator_write;
  41  use function MongoDB\server_supports_feature;
  42  use function sprintf;
  43  
  44  /**
  45   * Operation for the aggregate command.
  46   *
  47   * @api
  48   * @see \MongoDB\Collection::aggregate()
  49   * @see http://docs.mongodb.org/manual/reference/command/aggregate/
  50   */
  51  class Aggregate implements Executable, Explainable
  52  {
  53      /** @var integer */
  54      private static $wireVersionForCollation = 5;
  55  
  56      /** @var integer */
  57      private static $wireVersionForDocumentLevelValidation = 4;
  58  
  59      /** @var integer */
  60      private static $wireVersionForReadConcern = 4;
  61  
  62      /** @var integer */
  63      private static $wireVersionForWriteConcern = 5;
  64  
  65      /** @var string */
  66      private $databaseName;
  67  
  68      /** @var string|null */
  69      private $collectionName;
  70  
  71      /** @var array */
  72      private $pipeline;
  73  
  74      /** @var array */
  75      private $options;
  76  
  77      /**
  78       * Constructs an aggregate command.
  79       *
  80       * Supported options:
  81       *
  82       *  * allowDiskUse (boolean): Enables writing to temporary files. When set
  83       *    to true, aggregation stages can write data to the _tmp sub-directory
  84       *    in the dbPath directory. The default is false.
  85       *
  86       *  * batchSize (integer): The number of documents to return per batch.
  87       *
  88       *  * bypassDocumentValidation (boolean): If true, allows the write to
  89       *    circumvent document level validation. This only applies when an $out
  90       *    or $merge stage is specified.
  91       *
  92       *    For servers < 3.2, this option is ignored as document level validation
  93       *    is not available.
  94       *
  95       *  * collation (document): Collation specification.
  96       *
  97       *    This is not supported for server versions < 3.4 and will result in an
  98       *    exception at execution time if used.
  99       *
 100       *  * comment (string): An arbitrary string to help trace the operation
 101       *    through the database profiler, currentOp, and logs.
 102       *
 103       *  * explain (boolean): Specifies whether or not to return the information
 104       *    on the processing of the pipeline.
 105       *
 106       *  * hint (string|document): The index to use. Specify either the index
 107       *    name as a string or the index key pattern as a document. If specified,
 108       *    then the query system will only consider plans using the hinted index.
 109       *
 110       *  * maxTimeMS (integer): The maximum amount of time to allow the query to
 111       *    run.
 112       *
 113       *  * readConcern (MongoDB\Driver\ReadConcern): Read concern.
 114       *
 115       *    This is not supported for server versions < 3.2 and will result in an
 116       *    exception at execution time if used.
 117       *
 118       *  * readPreference (MongoDB\Driver\ReadPreference): Read preference.
 119       *
 120       *    This option is ignored if an $out or $merge stage is specified.
 121       *
 122       *  * session (MongoDB\Driver\Session): Client session.
 123       *
 124       *    Sessions are not supported for server versions < 3.6.
 125       *
 126       *  * typeMap (array): Type map for BSON deserialization. This will be
 127       *    applied to the returned Cursor (it is not sent to the server).
 128       *
 129       *  * useCursor (boolean): Indicates whether the command will request that
 130       *    the server provide results using a cursor. The default is true.
 131       *
 132       *    This option allows users to turn off cursors if necessary to aid in
 133       *    mongod/mongos upgrades.
 134       *
 135       *  * writeConcern (MongoDB\Driver\WriteConcern): Write concern. This only
 136       *    applies when an $out or $merge stage is specified.
 137       *
 138       *    This is not supported for server versions < 3.4 and will result in an
 139       *    exception at execution time if used.
 140       *
 141       * Note: Collection-agnostic commands (e.g. $currentOp) may be executed by
 142       * specifying null for the collection name.
 143       *
 144       * @param string      $databaseName   Database name
 145       * @param string|null $collectionName Collection name
 146       * @param array       $pipeline       List of pipeline operations
 147       * @param array       $options        Command options
 148       * @throws InvalidArgumentException for parameter/option parsing errors
 149       */
 150      public function __construct($databaseName, $collectionName, array $pipeline, array $options = [])
 151      {
 152          $expectedIndex = 0;
 153  
 154          foreach ($pipeline as $i => $operation) {
 155              if ($i !== $expectedIndex) {
 156                  throw new InvalidArgumentException(sprintf('$pipeline is not a list (unexpected index: "%s")', $i));
 157              }
 158  
 159              if (! is_array($operation) && ! is_object($operation)) {
 160                  throw InvalidArgumentException::invalidType(sprintf('$pipeline[%d]', $i), $operation, 'array or object');
 161              }
 162  
 163              $expectedIndex += 1;
 164          }
 165  
 166          $options += [
 167              'allowDiskUse' => false,
 168              'useCursor' => true,
 169          ];
 170  
 171          if (! is_bool($options['allowDiskUse'])) {
 172              throw InvalidArgumentException::invalidType('"allowDiskUse" option', $options['allowDiskUse'], 'boolean');
 173          }
 174  
 175          if (isset($options['batchSize']) && ! is_integer($options['batchSize'])) {
 176              throw InvalidArgumentException::invalidType('"batchSize" option', $options['batchSize'], 'integer');
 177          }
 178  
 179          if (isset($options['bypassDocumentValidation']) && ! is_bool($options['bypassDocumentValidation'])) {
 180              throw InvalidArgumentException::invalidType('"bypassDocumentValidation" option', $options['bypassDocumentValidation'], 'boolean');
 181          }
 182  
 183          if (isset($options['collation']) && ! is_array($options['collation']) && ! is_object($options['collation'])) {
 184              throw InvalidArgumentException::invalidType('"collation" option', $options['collation'], 'array or object');
 185          }
 186  
 187          if (isset($options['comment']) && ! is_string($options['comment'])) {
 188              throw InvalidArgumentException::invalidType('"comment" option', $options['comment'], 'string');
 189          }
 190  
 191          if (isset($options['explain']) && ! is_bool($options['explain'])) {
 192              throw InvalidArgumentException::invalidType('"explain" option', $options['explain'], 'boolean');
 193          }
 194  
 195          if (isset($options['hint']) && ! is_string($options['hint']) && ! is_array($options['hint']) && ! is_object($options['hint'])) {
 196              throw InvalidArgumentException::invalidType('"hint" option', $options['hint'], 'string or array or object');
 197          }
 198  
 199          if (isset($options['maxAwaitTimeMS']) && ! is_integer($options['maxAwaitTimeMS'])) {
 200              throw InvalidArgumentException::invalidType('"maxAwaitTimeMS" option', $options['maxAwaitTimeMS'], 'integer');
 201          }
 202  
 203          if (isset($options['maxTimeMS']) && ! is_integer($options['maxTimeMS'])) {
 204              throw InvalidArgumentException::invalidType('"maxTimeMS" option', $options['maxTimeMS'], 'integer');
 205          }
 206  
 207          if (isset($options['readConcern']) && ! $options['readConcern'] instanceof ReadConcern) {
 208              throw InvalidArgumentException::invalidType('"readConcern" option', $options['readConcern'], ReadConcern::class);
 209          }
 210  
 211          if (isset($options['readPreference']) && ! $options['readPreference'] instanceof ReadPreference) {
 212              throw InvalidArgumentException::invalidType('"readPreference" option', $options['readPreference'], ReadPreference::class);
 213          }
 214  
 215          if (isset($options['session']) && ! $options['session'] instanceof Session) {
 216              throw InvalidArgumentException::invalidType('"session" option', $options['session'], Session::class);
 217          }
 218  
 219          if (isset($options['typeMap']) && ! is_array($options['typeMap'])) {
 220              throw InvalidArgumentException::invalidType('"typeMap" option', $options['typeMap'], 'array');
 221          }
 222  
 223          if (! is_bool($options['useCursor'])) {
 224              throw InvalidArgumentException::invalidType('"useCursor" option', $options['useCursor'], 'boolean');
 225          }
 226  
 227          if (isset($options['writeConcern']) && ! $options['writeConcern'] instanceof WriteConcern) {
 228              throw InvalidArgumentException::invalidType('"writeConcern" option', $options['writeConcern'], WriteConcern::class);
 229          }
 230  
 231          if (isset($options['batchSize']) && ! $options['useCursor']) {
 232              throw new InvalidArgumentException('"batchSize" option should not be used if "useCursor" is false');
 233          }
 234  
 235          if (isset($options['readConcern']) && $options['readConcern']->isDefault()) {
 236              unset($options['readConcern']);
 237          }
 238  
 239          if (isset($options['writeConcern']) && $options['writeConcern']->isDefault()) {
 240              unset($options['writeConcern']);
 241          }
 242  
 243          if (! empty($options['explain'])) {
 244              $options['useCursor'] = false;
 245          }
 246  
 247          $this->databaseName = (string) $databaseName;
 248          $this->collectionName = isset($collectionName) ? (string) $collectionName : null;
 249          $this->pipeline = $pipeline;
 250          $this->options = $options;
 251      }
 252  
 253      /**
 254       * Execute the operation.
 255       *
 256       * @see Executable::execute()
 257       * @param Server $server
 258       * @return Traversable
 259       * @throws UnexpectedValueException if the command response was malformed
 260       * @throws UnsupportedException if collation, read concern, or write concern is used and unsupported
 261       * @throws DriverRuntimeException for other driver errors (e.g. connection errors)
 262       */
 263      public function execute(Server $server)
 264      {
 265          if (isset($this->options['collation']) && ! server_supports_feature($server, self::$wireVersionForCollation)) {
 266              throw UnsupportedException::collationNotSupported();
 267          }
 268  
 269          if (isset($this->options['readConcern']) && ! server_supports_feature($server, self::$wireVersionForReadConcern)) {
 270              throw UnsupportedException::readConcernNotSupported();
 271          }
 272  
 273          if (isset($this->options['writeConcern']) && ! server_supports_feature($server, self::$wireVersionForWriteConcern)) {
 274              throw UnsupportedException::writeConcernNotSupported();
 275          }
 276  
 277          $inTransaction = isset($this->options['session']) && $this->options['session']->isInTransaction();
 278          if ($inTransaction) {
 279              if (isset($this->options['readConcern'])) {
 280                  throw UnsupportedException::readConcernNotSupportedInTransaction();
 281              }
 282              if (isset($this->options['writeConcern'])) {
 283                  throw UnsupportedException::writeConcernNotSupportedInTransaction();
 284              }
 285          }
 286  
 287          $hasExplain = ! empty($this->options['explain']);
 288          $hasWriteStage = $this->hasWriteStage();
 289  
 290          $command = new Command(
 291              $this->createCommandDocument($server, $hasWriteStage),
 292              $this->createCommandOptions()
 293          );
 294          $options = $this->createOptions($hasWriteStage, $hasExplain);
 295  
 296          $cursor = $hasWriteStage && ! $hasExplain
 297              ? $server->executeReadWriteCommand($this->databaseName, $command, $options)
 298              : $server->executeReadCommand($this->databaseName, $command, $options);
 299  
 300          if ($this->options['useCursor'] || $hasExplain) {
 301              if (isset($this->options['typeMap'])) {
 302                  $cursor->setTypeMap($this->options['typeMap']);
 303              }
 304  
 305              return $cursor;
 306          }
 307  
 308          if (isset($this->options['typeMap'])) {
 309              $cursor->setTypeMap(create_field_path_type_map($this->options['typeMap'], 'result.$'));
 310          }
 311  
 312          $result = current($cursor->toArray());
 313  
 314          if (! isset($result->result) || ! is_array($result->result)) {
 315              throw new UnexpectedValueException('aggregate command did not return a "result" array');
 316          }
 317  
 318          return new ArrayIterator($result->result);
 319      }
 320  
 321      public function getCommandDocument(Server $server)
 322      {
 323          return $this->createCommandDocument($server, $this->hasWriteStage());
 324      }
 325  
 326      private function createCommandDocument(Server $server, bool $hasWriteStage) : array
 327      {
 328          $cmd = [
 329              'aggregate' => $this->collectionName ?? 1,
 330              'pipeline' => $this->pipeline,
 331          ];
 332  
 333          $cmd['allowDiskUse'] = $this->options['allowDiskUse'];
 334  
 335          if (! empty($this->options['bypassDocumentValidation']) &&
 336              server_supports_feature($server, self::$wireVersionForDocumentLevelValidation)
 337          ) {
 338              $cmd['bypassDocumentValidation'] = $this->options['bypassDocumentValidation'];
 339          }
 340  
 341          foreach (['comment', 'explain', 'maxTimeMS'] as $option) {
 342              if (isset($this->options[$option])) {
 343                  $cmd[$option] = $this->options[$option];
 344              }
 345          }
 346  
 347          if (isset($this->options['collation'])) {
 348              $cmd['collation'] = (object) $this->options['collation'];
 349          }
 350  
 351          if (isset($this->options['hint'])) {
 352              $cmd['hint'] = is_array($this->options['hint']) ? (object) $this->options['hint'] : $this->options['hint'];
 353          }
 354  
 355          if ($this->options['useCursor']) {
 356              /* Ignore batchSize if pipeline includes an $out or $merge stage, as
 357               * no documents will be returned and sending a batchSize of zero
 358               * could prevent the pipeline from executing at all. */
 359              $cmd['cursor'] = isset($this->options["batchSize"]) && ! $hasWriteStage
 360                  ? ['batchSize' => $this->options["batchSize"]]
 361                  : new stdClass();
 362          }
 363  
 364          return $cmd;
 365      }
 366  
 367      private function createCommandOptions() : array
 368      {
 369          $cmdOptions = [];
 370  
 371          if (isset($this->options['maxAwaitTimeMS'])) {
 372              $cmdOptions['maxAwaitTimeMS'] = $this->options['maxAwaitTimeMS'];
 373          }
 374  
 375          return $cmdOptions;
 376      }
 377  
 378      /**
 379       * Create options for executing the command.
 380       *
 381       * @see http://php.net/manual/en/mongodb-driver-server.executereadcommand.php
 382       * @see http://php.net/manual/en/mongodb-driver-server.executereadwritecommand.php
 383       * @param boolean $hasWriteStage
 384       * @param boolean $hasExplain
 385       * @return array
 386       */
 387      private function createOptions($hasWriteStage, $hasExplain)
 388      {
 389          $options = [];
 390  
 391          if (isset($this->options['readConcern'])) {
 392              $options['readConcern'] = $this->options['readConcern'];
 393          }
 394  
 395          if (! $hasWriteStage && isset($this->options['readPreference'])) {
 396              $options['readPreference'] = $this->options['readPreference'];
 397          }
 398  
 399          if (isset($this->options['session'])) {
 400              $options['session'] = $this->options['session'];
 401          }
 402  
 403          if ($hasWriteStage && ! $hasExplain && isset($this->options['writeConcern'])) {
 404              $options['writeConcern'] = $this->options['writeConcern'];
 405          }
 406  
 407          return $options;
 408      }
 409  
 410      private function hasWriteStage() : bool
 411      {
 412          return is_last_pipeline_operator_write($this->pipeline);
 413      }
 414  }