See Release Notes
Long Term Support Release
<?php /*< * Copyright 2015-2017 MongoDB, Inc.> * Copyright 2015-present MongoDB, Inc.* * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at *< * http://www.apache.org/licenses/LICENSE-2.0> * https://www.apache.org/licenses/LICENSE-2.0* * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ namespace MongoDB\Operation; use ArrayIterator; use MongoDB\Driver\Command;> use MongoDB\Driver\Cursor;use MongoDB\Driver\Exception\RuntimeException as DriverRuntimeException; use MongoDB\Driver\ReadConcern; use MongoDB\Driver\ReadPreference; use MongoDB\Driver\Server; use MongoDB\Driver\Session; use MongoDB\Driver\WriteConcern; use MongoDB\Exception\InvalidArgumentException; use MongoDB\Exception\UnexpectedValueException; use MongoDB\Exception\UnsupportedException; use stdClass;< use Traversable;>use function current; use function is_array; use function is_bool; use function is_integer; use function is_object; use function is_string; use function MongoDB\create_field_path_type_map; use function MongoDB\is_last_pipeline_operator_write;< use function MongoDB\server_supports_feature;use function sprintf; /** * Operation for the aggregate command. * * @api * @see \MongoDB\Collection::aggregate()< * @see http://docs.mongodb.org/manual/reference/command/aggregate/> * @see https://mongodb.com/docs/manual/reference/command/aggregate/*/ class Aggregate implements Executable, Explainable {< /** @var integer */ < private static $wireVersionForCollation = 5; < < /** @var integer */ < private static $wireVersionForDocumentLevelValidation = 4; < < /** @var integer */ < private static $wireVersionForReadConcern = 4; < < /** @var integer */ < private static $wireVersionForWriteConcern = 5; </** @var string */ private $databaseName; /** @var string|null */ private $collectionName; /** @var array */ private $pipeline; /** @var array */ private $options;> /** @var bool */ /** > private $isExplain; * Constructs an aggregate command. > * > /** @var bool */ * Supported options: > private $isWrite; * >* * allowDiskUse (boolean): Enables writing to temporary files. When set * to true, aggregation stages can write data to the _tmp sub-directory< * in the dbPath directory. The default is false.> * in the dbPath directory.* * * batchSize (integer): The number of documents to return per batch. * * * bypassDocumentValidation (boolean): If true, allows the write to * circumvent document level validation. This only applies when an $out * or $merge stage is specified. *< * For servers < 3.2, this option is ignored as document level validation < * is not available. < ** * collation (document): Collation specification. *< * This is not supported for server versions < 3.4 and will result in an < * exception at execution time if used.> * * comment (mixed): BSON value to attach as a comment to this command.*< * * comment (string): An arbitrary string to help trace the operation < * through the database profiler, currentOp, and logs.> * Only string values are supported for server versions < 4.4.* * * explain (boolean): Specifies whether or not to return the information * on the processing of the pipeline. * * * hint (string|document): The index to use. Specify either the index * name as a string or the index key pattern as a document. If specified, * then the query system will only consider plans using the hinted index. *> * * let (document): Map of parameter names and values. Values must be * * maxTimeMS (integer): The maximum amount of time to allow the query to > * constant or closed expressions that do not reference document fields. * run. > * Parameters can then be accessed as variables in an aggregate * > * expression context (e.g. "$$var"). * * readConcern (MongoDB\Driver\ReadConcern): Read concern. > * * > * This is not supported for server versions < 5.0 and will result in an * This is not supported for server versions < 3.2 and will result in an > * exception at execution time if used. * exception at execution time if used. > *< * This is not supported for server versions < 3.2 and will result in an < * exception at execution time if used. < ** This option is ignored if an $out or $merge stage is specified. * * * session (MongoDB\Driver\Session): Client session. *< * Sessions are not supported for server versions < 3.6. < ** * typeMap (array): Type map for BSON deserialization. This will be * applied to the returned Cursor (it is not sent to the server). * * * useCursor (boolean): Indicates whether the command will request that * the server provide results using a cursor. The default is true. * * This option allows users to turn off cursors if necessary to aid in * mongod/mongos upgrades. * * * writeConcern (MongoDB\Driver\WriteConcern): Write concern. This only * applies when an $out or $merge stage is specified. *< * This is not supported for server versions < 3.4 and will result in an < * exception at execution time if used. < ** Note: Collection-agnostic commands (e.g. $currentOp) may be executed by * specifying null for the collection name. * * @param string $databaseName Database name * @param string|null $collectionName Collection name * @param array $pipeline List of pipeline operations * @param array $options Command options * @throws InvalidArgumentException for parameter/option parsing errors */< public function __construct($databaseName, $collectionName, array $pipeline, array $options = [])> public function __construct(string $databaseName, ?string $collectionName, array $pipeline, array $options = []){ $expectedIndex = 0; foreach ($pipeline as $i => $operation) { if ($i !== $expectedIndex) { throw new InvalidArgumentException(sprintf('$pipeline is not a list (unexpected index: "%s")', $i)); } if (! is_array($operation) && ! is_object($operation)) { throw InvalidArgumentException::invalidType(sprintf('$pipeline[%d]', $i), $operation, 'array or object'); } $expectedIndex += 1; }< $options += [ < 'allowDiskUse' => false, < 'useCursor' => true, < ];> $options += ['useCursor' => true];< if (! is_bool($options['allowDiskUse'])) {> if (isset($options['allowDiskUse']) && ! is_bool($options['allowDiskUse'])) {throw InvalidArgumentException::invalidType('"allowDiskUse" option', $options['allowDiskUse'], 'boolean'); } if (isset($options['batchSize']) && ! is_integer($options['batchSize'])) { throw InvalidArgumentException::invalidType('"batchSize" option', $options['batchSize'], 'integer'); } if (isset($options['bypassDocumentValidation']) && ! is_bool($options['bypassDocumentValidation'])) { throw InvalidArgumentException::invalidType('"bypassDocumentValidation" option', $options['bypassDocumentValidation'], 'boolean'); } if (isset($options['collation']) && ! is_array($options['collation']) && ! is_object($options['collation'])) { throw InvalidArgumentException::invalidType('"collation" option', $options['collation'], 'array or object'); }< if (isset($options['comment']) && ! is_string($options['comment'])) { < throw InvalidArgumentException::invalidType('"comment" option', $options['comment'], 'string'); < } <if (isset($options['explain']) && ! is_bool($options['explain'])) { throw InvalidArgumentException::invalidType('"explain" option', $options['explain'], 'boolean'); } if (isset($options['hint']) && ! is_string($options['hint']) && ! is_array($options['hint']) && ! is_object($options['hint'])) { throw InvalidArgumentException::invalidType('"hint" option', $options['hint'], 'string or array or object'); }> if (isset($options['let']) && ! is_array($options['let']) && ! is_object($options['let'])) { if (isset($options['maxAwaitTimeMS']) && ! is_integer($options['maxAwaitTimeMS'])) { > throw InvalidArgumentException::invalidType('"let" option', $options['let'], ['array', 'object']); throw InvalidArgumentException::invalidType('"maxAwaitTimeMS" option', $options['maxAwaitTimeMS'], 'integer'); > } } >if (isset($options['maxTimeMS']) && ! is_integer($options['maxTimeMS'])) { throw InvalidArgumentException::invalidType('"maxTimeMS" option', $options['maxTimeMS'], 'integer'); } if (isset($options['readConcern']) && ! $options['readConcern'] instanceof ReadConcern) { throw InvalidArgumentException::invalidType('"readConcern" option', $options['readConcern'], ReadConcern::class); } if (isset($options['readPreference']) && ! $options['readPreference'] instanceof ReadPreference) { throw InvalidArgumentException::invalidType('"readPreference" option', $options['readPreference'], ReadPreference::class); } if (isset($options['session']) && ! $options['session'] instanceof Session) { throw InvalidArgumentException::invalidType('"session" option', $options['session'], Session::class); } if (isset($options['typeMap']) && ! is_array($options['typeMap'])) { throw InvalidArgumentException::invalidType('"typeMap" option', $options['typeMap'], 'array'); } if (! is_bool($options['useCursor'])) { throw InvalidArgumentException::invalidType('"useCursor" option', $options['useCursor'], 'boolean'); } if (isset($options['writeConcern']) && ! $options['writeConcern'] instanceof WriteConcern) { throw InvalidArgumentException::invalidType('"writeConcern" option', $options['writeConcern'], WriteConcern::class); } if (isset($options['batchSize']) && ! $options['useCursor']) { throw new InvalidArgumentException('"batchSize" option should not be used if "useCursor" is false'); }> if (isset($options['bypassDocumentValidation']) && ! $options['bypassDocumentValidation']) { if (isset($options['readConcern']) && $options['readConcern']->isDefault()) { > unset($options['bypassDocumentValidation']); unset($options['readConcern']); > } } >if (isset($options['writeConcern']) && $options['writeConcern']->isDefault()) { unset($options['writeConcern']); }< if (! empty($options['explain'])) {> $this->isExplain = ! empty($options['explain']); > $this->isWrite = is_last_pipeline_operator_write($pipeline) && ! $this->isExplain; > > // Explain does not use a cursor > if ($this->isExplain) {$options['useCursor'] = false;> unset($options['batchSize']); } > } > $this->databaseName = (string) $databaseName; > /* Ignore batchSize for writes, since no documents are returned and a $this->collectionName = isset($collectionName) ? (string) $collectionName : null; > * batchSize of zero could prevent the pipeline from executing. */ $this->pipeline = $pipeline; > if ($this->isWrite) { $this->options = $options; > unset($options['batchSize']);< $this->databaseName = (string) $databaseName; < $this->collectionName = isset($collectionName) ? (string) $collectionName : null;> $this->databaseName = $databaseName; > $this->collectionName = $collectionName;/** * Execute the operation. * * @see Executable::execute()< * @param Server $server < * @return Traversable> * @return ArrayIterator|Cursor* @throws UnexpectedValueException if the command response was malformed< * @throws UnsupportedException if collation, read concern, or write concern is used and unsupported> * @throws UnsupportedException if read concern or write concern is used and unsupported* @throws DriverRuntimeException for other driver errors (e.g. connection errors) */ public function execute(Server $server) {< if (isset($this->options['collation']) && ! server_supports_feature($server, self::$wireVersionForCollation)) { < throw UnsupportedException::collationNotSupported(); < } < < if (isset($this->options['readConcern']) && ! server_supports_feature($server, self::$wireVersionForReadConcern)) { < throw UnsupportedException::readConcernNotSupported(); < } < < if (isset($this->options['writeConcern']) && ! server_supports_feature($server, self::$wireVersionForWriteConcern)) { < throw UnsupportedException::writeConcernNotSupported(); < } <$inTransaction = isset($this->options['session']) && $this->options['session']->isInTransaction(); if ($inTransaction) { if (isset($this->options['readConcern'])) { throw UnsupportedException::readConcernNotSupportedInTransaction(); }>if (isset($this->options['writeConcern'])) { throw UnsupportedException::writeConcernNotSupportedInTransaction(); } }< $hasExplain = ! empty($this->options['explain']); < $hasWriteStage = $this->hasWriteStage(); <$command = new Command(< $this->createCommandDocument($server, $hasWriteStage),> $this->createCommandDocument(),$this->createCommandOptions() );< $options = $this->createOptions($hasWriteStage, $hasExplain);< $cursor = $hasWriteStage && ! $hasExplain < ? $server->executeReadWriteCommand($this->databaseName, $command, $options) < : $server->executeReadCommand($this->databaseName, $command, $options);> $cursor = $this->executeCommand($server, $command);< if ($this->options['useCursor'] || $hasExplain) {> if ($this->options['useCursor'] || $this->isExplain) {if (isset($this->options['typeMap'])) { $cursor->setTypeMap($this->options['typeMap']); } return $cursor; } if (isset($this->options['typeMap'])) { $cursor->setTypeMap(create_field_path_type_map($this->options['typeMap'], 'result.$')); } $result = current($cursor->toArray());< if (! isset($result->result) || ! is_array($result->result)) {> if (! is_object($result) || ! isset($result->result) || ! is_array($result->result)) {throw new UnexpectedValueException('aggregate command did not return a "result" array'); } return new ArrayIterator($result->result); }> /** public function getCommandDocument(Server $server) > * Returns the command document for this operation. { > * return $this->createCommandDocument($server, $this->hasWriteStage()); > * @see Explainable::getCommandDocument() } > * @return array > */< return $this->createCommandDocument($server, $this->hasWriteStage());> return $this->createCommandDocument();< private function createCommandDocument(Server $server, bool $hasWriteStage) : array> /** > * Create the aggregate command document. > */ > private function createCommandDocument(): array$cmd = [ 'aggregate' => $this->collectionName ?? 1, 'pipeline' => $this->pipeline, ];< $cmd['allowDiskUse'] = $this->options['allowDiskUse']; < < if (! empty($this->options['bypassDocumentValidation']) && < server_supports_feature($server, self::$wireVersionForDocumentLevelValidation) < ) { < $cmd['bypassDocumentValidation'] = $this->options['bypassDocumentValidation']; < } < < foreach (['comment', 'explain', 'maxTimeMS'] as $option) {> foreach (['allowDiskUse', 'bypassDocumentValidation', 'comment', 'explain', 'maxTimeMS'] as $option) {if (isset($this->options[$option])) { $cmd[$option] = $this->options[$option]; } }< if (isset($this->options['collation'])) { < $cmd['collation'] = (object) $this->options['collation'];> foreach (['collation', 'let'] as $option) { > if (isset($this->options[$option])) { > $cmd[$option] = (object) $this->options[$option]; > }} if (isset($this->options['hint'])) { $cmd['hint'] = is_array($this->options['hint']) ? (object) $this->options['hint'] : $this->options['hint']; } if ($this->options['useCursor']) {< /* Ignore batchSize if pipeline includes an $out or $merge stage, as < * no documents will be returned and sending a batchSize of zero < * could prevent the pipeline from executing at all. */ < $cmd['cursor'] = isset($this->options["batchSize"]) && ! $hasWriteStage> $cmd['cursor'] = isset($this->options["batchSize"])? ['batchSize' => $this->options["batchSize"]] : new stdClass(); } return $cmd; } private function createCommandOptions() : array { $cmdOptions = []; if (isset($this->options['maxAwaitTimeMS'])) { $cmdOptions['maxAwaitTimeMS'] = $this->options['maxAwaitTimeMS']; } return $cmdOptions; } /**< * Create options for executing the command.> * Execute the aggregate command using the appropriate Server method.*< * @see http://php.net/manual/en/mongodb-driver-server.executereadcommand.php < * @see http://php.net/manual/en/mongodb-driver-server.executereadwritecommand.php < * @param boolean $hasWriteStage < * @param boolean $hasExplain < * @return array> * @see https://php.net/manual/en/mongodb-driver-server.executecommand.php > * @see https://php.net/manual/en/mongodb-driver-server.executereadcommand.php > * @see https://php.net/manual/en/mongodb-driver-server.executereadwritecommand.php*/< private function createOptions($hasWriteStage, $hasExplain)> private function executeCommand(Server $server, Command $command): Cursor{ $options = [];< if (isset($this->options['readConcern'])) { < $options['readConcern'] = $this->options['readConcern'];> foreach (['readConcern', 'readPreference', 'session'] as $option) { > if (isset($this->options[$option])) { > $options[$option] = $this->options[$option];}< < if (! $hasWriteStage && isset($this->options['readPreference'])) { < $options['readPreference'] = $this->options['readPreference'];}< if (isset($this->options['session'])) { < $options['session'] = $this->options['session'];> if ($this->isWrite && isset($this->options['writeConcern'])) { > $options['writeConcern'] = $this->options['writeConcern'];}< if ($hasWriteStage && ! $hasExplain && isset($this->options['writeConcern'])) { < $options['writeConcern'] = $this->options['writeConcern'];> if (! $this->isWrite) { > return $server->executeReadCommand($this->databaseName, $command, $options);}< return $options;> /* Server::executeReadWriteCommand() does not support a "readPreference" > * option, so fall back to executeCommand(). This means that libmongoc > * will not apply any client-level options (e.g. writeConcern), but that > * should not be an issue as PHPLIB handles inheritance on its own. */ > if (isset($options['readPreference'])) { > return $server->executeCommand($this->databaseName, $command, $options);}< private function hasWriteStage() : bool < { < return is_last_pipeline_operator_write($this->pipeline);> return $server->executeReadWriteCommand($this->databaseName, $command, $options);} }