Differences Between: [Versions 310 and 400] [Versions 39 and 400] [Versions 400 and 401]
1 <?php 2 /* 3 * Copyright 2015-2017 MongoDB, Inc. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18 namespace MongoDB\Operation; 19 20 use ArrayIterator; 21 use MongoDB\Driver\Command; 22 use MongoDB\Driver\Exception\RuntimeException as DriverRuntimeException; 23 use MongoDB\Driver\ReadConcern; 24 use MongoDB\Driver\ReadPreference; 25 use MongoDB\Driver\Server; 26 use MongoDB\Driver\Session; 27 use MongoDB\Driver\WriteConcern; 28 use MongoDB\Exception\InvalidArgumentException; 29 use MongoDB\Exception\UnexpectedValueException; 30 use MongoDB\Exception\UnsupportedException; 31 use stdClass; 32 use Traversable; 33 use function current; 34 use function is_array; 35 use function is_bool; 36 use function is_integer; 37 use function is_object; 38 use function is_string; 39 use function MongoDB\create_field_path_type_map; 40 use function MongoDB\is_last_pipeline_operator_write; 41 use function MongoDB\server_supports_feature; 42 use function sprintf; 43 44 /** 45 * Operation for the aggregate command. 46 * 47 * @api 48 * @see \MongoDB\Collection::aggregate() 49 * @see http://docs.mongodb.org/manual/reference/command/aggregate/ 50 */ 51 class Aggregate implements Executable, Explainable 52 { 53 /** @var integer */ 54 private static $wireVersionForCollation = 5; 55 56 /** @var integer */ 57 private static $wireVersionForDocumentLevelValidation = 4; 58 59 /** @var integer */ 60 private static $wireVersionForReadConcern = 4; 61 62 /** @var integer */ 63 private static $wireVersionForWriteConcern = 5; 64 65 /** @var string */ 66 private $databaseName; 67 68 /** @var string|null */ 69 private $collectionName; 70 71 /** @var array */ 72 private $pipeline; 73 74 /** @var array */ 75 private $options; 76 77 /** 78 * Constructs an aggregate command. 79 * 80 * Supported options: 81 * 82 * * allowDiskUse (boolean): Enables writing to temporary files. When set 83 * to true, aggregation stages can write data to the _tmp sub-directory 84 * in the dbPath directory. The default is false. 85 * 86 * * batchSize (integer): The number of documents to return per batch. 87 * 88 * * bypassDocumentValidation (boolean): If true, allows the write to 89 * circumvent document level validation. This only applies when an $out 90 * or $merge stage is specified. 91 * 92 * For servers < 3.2, this option is ignored as document level validation 93 * is not available. 94 * 95 * * collation (document): Collation specification. 96 * 97 * This is not supported for server versions < 3.4 and will result in an 98 * exception at execution time if used. 99 * 100 * * comment (string): An arbitrary string to help trace the operation 101 * through the database profiler, currentOp, and logs. 102 * 103 * * explain (boolean): Specifies whether or not to return the information 104 * on the processing of the pipeline. 105 * 106 * * hint (string|document): The index to use. Specify either the index 107 * name as a string or the index key pattern as a document. If specified, 108 * then the query system will only consider plans using the hinted index. 109 * 110 * * maxTimeMS (integer): The maximum amount of time to allow the query to 111 * run. 112 * 113 * * readConcern (MongoDB\Driver\ReadConcern): Read concern. 114 * 115 * This is not supported for server versions < 3.2 and will result in an 116 * exception at execution time if used. 117 * 118 * * readPreference (MongoDB\Driver\ReadPreference): Read preference. 119 * 120 * This option is ignored if an $out or $merge stage is specified. 121 * 122 * * session (MongoDB\Driver\Session): Client session. 123 * 124 * Sessions are not supported for server versions < 3.6. 125 * 126 * * typeMap (array): Type map for BSON deserialization. This will be 127 * applied to the returned Cursor (it is not sent to the server). 128 * 129 * * useCursor (boolean): Indicates whether the command will request that 130 * the server provide results using a cursor. The default is true. 131 * 132 * This option allows users to turn off cursors if necessary to aid in 133 * mongod/mongos upgrades. 134 * 135 * * writeConcern (MongoDB\Driver\WriteConcern): Write concern. This only 136 * applies when an $out or $merge stage is specified. 137 * 138 * This is not supported for server versions < 3.4 and will result in an 139 * exception at execution time if used. 140 * 141 * Note: Collection-agnostic commands (e.g. $currentOp) may be executed by 142 * specifying null for the collection name. 143 * 144 * @param string $databaseName Database name 145 * @param string|null $collectionName Collection name 146 * @param array $pipeline List of pipeline operations 147 * @param array $options Command options 148 * @throws InvalidArgumentException for parameter/option parsing errors 149 */ 150 public function __construct($databaseName, $collectionName, array $pipeline, array $options = []) 151 { 152 $expectedIndex = 0; 153 154 foreach ($pipeline as $i => $operation) { 155 if ($i !== $expectedIndex) { 156 throw new InvalidArgumentException(sprintf('$pipeline is not a list (unexpected index: "%s")', $i)); 157 } 158 159 if (! is_array($operation) && ! is_object($operation)) { 160 throw InvalidArgumentException::invalidType(sprintf('$pipeline[%d]', $i), $operation, 'array or object'); 161 } 162 163 $expectedIndex += 1; 164 } 165 166 $options += [ 167 'allowDiskUse' => false, 168 'useCursor' => true, 169 ]; 170 171 if (! is_bool($options['allowDiskUse'])) { 172 throw InvalidArgumentException::invalidType('"allowDiskUse" option', $options['allowDiskUse'], 'boolean'); 173 } 174 175 if (isset($options['batchSize']) && ! is_integer($options['batchSize'])) { 176 throw InvalidArgumentException::invalidType('"batchSize" option', $options['batchSize'], 'integer'); 177 } 178 179 if (isset($options['bypassDocumentValidation']) && ! is_bool($options['bypassDocumentValidation'])) { 180 throw InvalidArgumentException::invalidType('"bypassDocumentValidation" option', $options['bypassDocumentValidation'], 'boolean'); 181 } 182 183 if (isset($options['collation']) && ! is_array($options['collation']) && ! is_object($options['collation'])) { 184 throw InvalidArgumentException::invalidType('"collation" option', $options['collation'], 'array or object'); 185 } 186 187 if (isset($options['comment']) && ! is_string($options['comment'])) { 188 throw InvalidArgumentException::invalidType('"comment" option', $options['comment'], 'string'); 189 } 190 191 if (isset($options['explain']) && ! is_bool($options['explain'])) { 192 throw InvalidArgumentException::invalidType('"explain" option', $options['explain'], 'boolean'); 193 } 194 195 if (isset($options['hint']) && ! is_string($options['hint']) && ! is_array($options['hint']) && ! is_object($options['hint'])) { 196 throw InvalidArgumentException::invalidType('"hint" option', $options['hint'], 'string or array or object'); 197 } 198 199 if (isset($options['maxAwaitTimeMS']) && ! is_integer($options['maxAwaitTimeMS'])) { 200 throw InvalidArgumentException::invalidType('"maxAwaitTimeMS" option', $options['maxAwaitTimeMS'], 'integer'); 201 } 202 203 if (isset($options['maxTimeMS']) && ! is_integer($options['maxTimeMS'])) { 204 throw InvalidArgumentException::invalidType('"maxTimeMS" option', $options['maxTimeMS'], 'integer'); 205 } 206 207 if (isset($options['readConcern']) && ! $options['readConcern'] instanceof ReadConcern) { 208 throw InvalidArgumentException::invalidType('"readConcern" option', $options['readConcern'], ReadConcern::class); 209 } 210 211 if (isset($options['readPreference']) && ! $options['readPreference'] instanceof ReadPreference) { 212 throw InvalidArgumentException::invalidType('"readPreference" option', $options['readPreference'], ReadPreference::class); 213 } 214 215 if (isset($options['session']) && ! $options['session'] instanceof Session) { 216 throw InvalidArgumentException::invalidType('"session" option', $options['session'], Session::class); 217 } 218 219 if (isset($options['typeMap']) && ! is_array($options['typeMap'])) { 220 throw InvalidArgumentException::invalidType('"typeMap" option', $options['typeMap'], 'array'); 221 } 222 223 if (! is_bool($options['useCursor'])) { 224 throw InvalidArgumentException::invalidType('"useCursor" option', $options['useCursor'], 'boolean'); 225 } 226 227 if (isset($options['writeConcern']) && ! $options['writeConcern'] instanceof WriteConcern) { 228 throw InvalidArgumentException::invalidType('"writeConcern" option', $options['writeConcern'], WriteConcern::class); 229 } 230 231 if (isset($options['batchSize']) && ! $options['useCursor']) { 232 throw new InvalidArgumentException('"batchSize" option should not be used if "useCursor" is false'); 233 } 234 235 if (isset($options['readConcern']) && $options['readConcern']->isDefault()) { 236 unset($options['readConcern']); 237 } 238 239 if (isset($options['writeConcern']) && $options['writeConcern']->isDefault()) { 240 unset($options['writeConcern']); 241 } 242 243 if (! empty($options['explain'])) { 244 $options['useCursor'] = false; 245 } 246 247 $this->databaseName = (string) $databaseName; 248 $this->collectionName = isset($collectionName) ? (string) $collectionName : null; 249 $this->pipeline = $pipeline; 250 $this->options = $options; 251 } 252 253 /** 254 * Execute the operation. 255 * 256 * @see Executable::execute() 257 * @param Server $server 258 * @return Traversable 259 * @throws UnexpectedValueException if the command response was malformed 260 * @throws UnsupportedException if collation, read concern, or write concern is used and unsupported 261 * @throws DriverRuntimeException for other driver errors (e.g. connection errors) 262 */ 263 public function execute(Server $server) 264 { 265 if (isset($this->options['collation']) && ! server_supports_feature($server, self::$wireVersionForCollation)) { 266 throw UnsupportedException::collationNotSupported(); 267 } 268 269 if (isset($this->options['readConcern']) && ! server_supports_feature($server, self::$wireVersionForReadConcern)) { 270 throw UnsupportedException::readConcernNotSupported(); 271 } 272 273 if (isset($this->options['writeConcern']) && ! server_supports_feature($server, self::$wireVersionForWriteConcern)) { 274 throw UnsupportedException::writeConcernNotSupported(); 275 } 276 277 $inTransaction = isset($this->options['session']) && $this->options['session']->isInTransaction(); 278 if ($inTransaction) { 279 if (isset($this->options['readConcern'])) { 280 throw UnsupportedException::readConcernNotSupportedInTransaction(); 281 } 282 if (isset($this->options['writeConcern'])) { 283 throw UnsupportedException::writeConcernNotSupportedInTransaction(); 284 } 285 } 286 287 $hasExplain = ! empty($this->options['explain']); 288 $hasWriteStage = $this->hasWriteStage(); 289 290 $command = new Command( 291 $this->createCommandDocument($server, $hasWriteStage), 292 $this->createCommandOptions() 293 ); 294 $options = $this->createOptions($hasWriteStage, $hasExplain); 295 296 $cursor = $hasWriteStage && ! $hasExplain 297 ? $server->executeReadWriteCommand($this->databaseName, $command, $options) 298 : $server->executeReadCommand($this->databaseName, $command, $options); 299 300 if ($this->options['useCursor'] || $hasExplain) { 301 if (isset($this->options['typeMap'])) { 302 $cursor->setTypeMap($this->options['typeMap']); 303 } 304 305 return $cursor; 306 } 307 308 if (isset($this->options['typeMap'])) { 309 $cursor->setTypeMap(create_field_path_type_map($this->options['typeMap'], 'result.$')); 310 } 311 312 $result = current($cursor->toArray()); 313 314 if (! isset($result->result) || ! is_array($result->result)) { 315 throw new UnexpectedValueException('aggregate command did not return a "result" array'); 316 } 317 318 return new ArrayIterator($result->result); 319 } 320 321 public function getCommandDocument(Server $server) 322 { 323 return $this->createCommandDocument($server, $this->hasWriteStage()); 324 } 325 326 private function createCommandDocument(Server $server, bool $hasWriteStage) : array 327 { 328 $cmd = [ 329 'aggregate' => $this->collectionName ?? 1, 330 'pipeline' => $this->pipeline, 331 ]; 332 333 $cmd['allowDiskUse'] = $this->options['allowDiskUse']; 334 335 if (! empty($this->options['bypassDocumentValidation']) && 336 server_supports_feature($server, self::$wireVersionForDocumentLevelValidation) 337 ) { 338 $cmd['bypassDocumentValidation'] = $this->options['bypassDocumentValidation']; 339 } 340 341 foreach (['comment', 'explain', 'maxTimeMS'] as $option) { 342 if (isset($this->options[$option])) { 343 $cmd[$option] = $this->options[$option]; 344 } 345 } 346 347 if (isset($this->options['collation'])) { 348 $cmd['collation'] = (object) $this->options['collation']; 349 } 350 351 if (isset($this->options['hint'])) { 352 $cmd['hint'] = is_array($this->options['hint']) ? (object) $this->options['hint'] : $this->options['hint']; 353 } 354 355 if ($this->options['useCursor']) { 356 /* Ignore batchSize if pipeline includes an $out or $merge stage, as 357 * no documents will be returned and sending a batchSize of zero 358 * could prevent the pipeline from executing at all. */ 359 $cmd['cursor'] = isset($this->options["batchSize"]) && ! $hasWriteStage 360 ? ['batchSize' => $this->options["batchSize"]] 361 : new stdClass(); 362 } 363 364 return $cmd; 365 } 366 367 private function createCommandOptions() : array 368 { 369 $cmdOptions = []; 370 371 if (isset($this->options['maxAwaitTimeMS'])) { 372 $cmdOptions['maxAwaitTimeMS'] = $this->options['maxAwaitTimeMS']; 373 } 374 375 return $cmdOptions; 376 } 377 378 /** 379 * Create options for executing the command. 380 * 381 * @see http://php.net/manual/en/mongodb-driver-server.executereadcommand.php 382 * @see http://php.net/manual/en/mongodb-driver-server.executereadwritecommand.php 383 * @param boolean $hasWriteStage 384 * @param boolean $hasExplain 385 * @return array 386 */ 387 private function createOptions($hasWriteStage, $hasExplain) 388 { 389 $options = []; 390 391 if (isset($this->options['readConcern'])) { 392 $options['readConcern'] = $this->options['readConcern']; 393 } 394 395 if (! $hasWriteStage && isset($this->options['readPreference'])) { 396 $options['readPreference'] = $this->options['readPreference']; 397 } 398 399 if (isset($this->options['session'])) { 400 $options['session'] = $this->options['session']; 401 } 402 403 if ($hasWriteStage && ! $hasExplain && isset($this->options['writeConcern'])) { 404 $options['writeConcern'] = $this->options['writeConcern']; 405 } 406 407 return $options; 408 } 409 410 private function hasWriteStage() : bool 411 { 412 return is_last_pipeline_operator_write($this->pipeline); 413 } 414 }
title
Description
Body
title
Description
Body
title
Description
Body
title
Body