See Release Notes
Long Term Support Release
Differences Between: [Versions 310 and 401] [Versions 311 and 401] [Versions 39 and 401] [Versions 400 and 401]
1 <?php 2 /* 3 * Copyright 2015-present MongoDB, Inc. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * https://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18 namespace MongoDB\Operation; 19 20 use MongoDB\BulkWriteResult; 21 use MongoDB\Driver\BulkWrite as Bulk; 22 use MongoDB\Driver\Exception\RuntimeException as DriverRuntimeException; 23 use MongoDB\Driver\Server; 24 use MongoDB\Driver\Session; 25 use MongoDB\Driver\WriteConcern; 26 use MongoDB\Exception\InvalidArgumentException; 27 use MongoDB\Exception\UnsupportedException; 28 29 use function array_key_exists; 30 use function count; 31 use function current; 32 use function is_array; 33 use function is_bool; 34 use function is_object; 35 use function key; 36 use function MongoDB\is_first_key_operator; 37 use function MongoDB\is_pipeline; 38 use function sprintf; 39 40 /** 41 * Operation for executing multiple write operations. 42 * 43 * @api 44 * @see \MongoDB\Collection::bulkWrite() 45 */ 46 class BulkWrite implements Executable 47 { 48 public const DELETE_MANY = 'deleteMany'; 49 public const DELETE_ONE = 'deleteOne'; 50 public const INSERT_ONE = 'insertOne'; 51 public const REPLACE_ONE = 'replaceOne'; 52 public const UPDATE_MANY = 'updateMany'; 53 public const UPDATE_ONE = 'updateOne'; 54 55 /** @var string */ 56 private $databaseName; 57 58 /** @var string */ 59 private $collectionName; 60 61 /** @var array[] */ 62 private $operations; 63 64 /** @var array */ 65 private $options; 66 67 /** 68 * Constructs a bulk write operation. 69 * 70 * Example array structure for all supported operation types: 71 * 72 * [ 73 * [ 'deleteMany' => [ $filter, $options ] ], 74 * [ 'deleteOne' => [ $filter, $options ] ], 75 * [ 'insertOne' => [ $document ] ], 76 * [ 'replaceOne' => [ $filter, $replacement, $options ] ], 77 * [ 'updateMany' => [ $filter, $update, $options ] ], 78 * [ 'updateOne' => [ $filter, $update, $options ] ], 79 * ] 80 * 81 * Arguments correspond to the respective Operation classes; however, the 82 * writeConcern option is specified for the top-level bulk write operation 83 * instead of each individual operation. 84 * 85 * Supported options for deleteMany and deleteOne operations: 86 * 87 * * collation (document): Collation specification. 88 * 89 * Supported options for replaceOne, updateMany, and updateOne operations: 90 * 91 * * collation (document): Collation specification. 92 * 93 * * upsert (boolean): When true, a new document is created if no document 94 * matches the query. The default is false. 95 * 96 * Supported options for updateMany and updateOne operations: 97 * 98 * * arrayFilters (document array): A set of filters specifying to which 99 * array elements an update should apply. 100 * 101 * Supported options for the bulk write operation: 102 * 103 * * bypassDocumentValidation (boolean): If true, allows the write to 104 * circumvent document level validation. The default is false. 105 * 106 * * comment (mixed): BSON value to attach as a comment to this command(s) 107 * associated with this bulk write. 108 * 109 * This is not supported for servers versions < 4.4. 110 * 111 * * ordered (boolean): If true, when an insert fails, return without 112 * performing the remaining writes. If false, when a write fails, 113 * continue with the remaining writes, if any. The default is true. 114 * 115 * * let (document): Map of parameter names and values. Values must be 116 * constant or closed expressions that do not reference document fields. 117 * Parameters can then be accessed as variables in an aggregate 118 * expression context (e.g. "$$var"). 119 * 120 * * session (MongoDB\Driver\Session): Client session. 121 * 122 * * writeConcern (MongoDB\Driver\WriteConcern): Write concern. 123 * 124 * @param string $databaseName Database name 125 * @param string $collectionName Collection name 126 * @param array[] $operations List of write operations 127 * @param array $options Command options 128 * @throws InvalidArgumentException for parameter/option parsing errors 129 */ 130 public function __construct(string $databaseName, string $collectionName, array $operations, array $options = []) 131 { 132 if (empty($operations)) { 133 throw new InvalidArgumentException('$operations is empty'); 134 } 135 136 $expectedIndex = 0; 137 138 foreach ($operations as $i => $operation) { 139 if ($i !== $expectedIndex) { 140 throw new InvalidArgumentException(sprintf('$operations is not a list (unexpected index: "%s")', $i)); 141 } 142 143 if (! is_array($operation)) { 144 throw InvalidArgumentException::invalidType(sprintf('$operations[%d]', $i), $operation, 'array'); 145 } 146 147 if (count($operation) !== 1) { 148 throw new InvalidArgumentException(sprintf('Expected one element in $operation[%d], actually: %d', $i, count($operation))); 149 } 150 151 $type = key($operation); 152 $args = current($operation); 153 154 if (! isset($args[0]) && ! array_key_exists(0, $args)) { 155 throw new InvalidArgumentException(sprintf('Missing first argument for $operations[%d]["%s"]', $i, $type)); 156 } 157 158 if (! is_array($args[0]) && ! is_object($args[0])) { 159 throw InvalidArgumentException::invalidType(sprintf('$operations[%d]["%s"][0]', $i, $type), $args[0], 'array or object'); 160 } 161 162 switch ($type) { 163 case self::INSERT_ONE: 164 break; 165 166 case self::DELETE_MANY: 167 case self::DELETE_ONE: 168 if (! isset($args[1])) { 169 $args[1] = []; 170 } 171 172 if (! is_array($args[1])) { 173 throw InvalidArgumentException::invalidType(sprintf('$operations[%d]["%s"][1]', $i, $type), $args[1], 'array'); 174 } 175 176 $args[1]['limit'] = ($type === self::DELETE_ONE ? 1 : 0); 177 178 if (isset($args[1]['collation']) && ! is_array($args[1]['collation']) && ! is_object($args[1]['collation'])) { 179 throw InvalidArgumentException::invalidType(sprintf('$operations[%d]["%s"][1]["collation"]', $i, $type), $args[1]['collation'], 'array or object'); 180 } 181 182 $operations[$i][$type][1] = $args[1]; 183 184 break; 185 186 case self::REPLACE_ONE: 187 if (! isset($args[1]) && ! array_key_exists(1, $args)) { 188 throw new InvalidArgumentException(sprintf('Missing second argument for $operations[%d]["%s"]', $i, $type)); 189 } 190 191 if (! is_array($args[1]) && ! is_object($args[1])) { 192 throw InvalidArgumentException::invalidType(sprintf('$operations[%d]["%s"][1]', $i, $type), $args[1], 'array or object'); 193 } 194 195 if (is_first_key_operator($args[1])) { 196 throw new InvalidArgumentException(sprintf('First key in $operations[%d]["%s"][1] is an update operator', $i, $type)); 197 } 198 199 if (! isset($args[2])) { 200 $args[2] = []; 201 } 202 203 if (! is_array($args[2])) { 204 throw InvalidArgumentException::invalidType(sprintf('$operations[%d]["%s"][2]', $i, $type), $args[2], 'array'); 205 } 206 207 $args[2]['multi'] = false; 208 $args[2] += ['upsert' => false]; 209 210 if (isset($args[2]['collation']) && ! is_array($args[2]['collation']) && ! is_object($args[2]['collation'])) { 211 throw InvalidArgumentException::invalidType(sprintf('$operations[%d]["%s"][2]["collation"]', $i, $type), $args[2]['collation'], 'array or object'); 212 } 213 214 if (! is_bool($args[2]['upsert'])) { 215 throw InvalidArgumentException::invalidType(sprintf('$operations[%d]["%s"][2]["upsert"]', $i, $type), $args[2]['upsert'], 'boolean'); 216 } 217 218 $operations[$i][$type][2] = $args[2]; 219 220 break; 221 222 case self::UPDATE_MANY: 223 case self::UPDATE_ONE: 224 if (! isset($args[1]) && ! array_key_exists(1, $args)) { 225 throw new InvalidArgumentException(sprintf('Missing second argument for $operations[%d]["%s"]', $i, $type)); 226 } 227 228 if (! is_array($args[1]) && ! is_object($args[1])) { 229 throw InvalidArgumentException::invalidType(sprintf('$operations[%d]["%s"][1]', $i, $type), $args[1], 'array or object'); 230 } 231 232 if (! is_first_key_operator($args[1]) && ! is_pipeline($args[1])) { 233 throw new InvalidArgumentException(sprintf('First key in $operations[%d]["%s"][1] is neither an update operator nor a pipeline', $i, $type)); 234 } 235 236 if (! isset($args[2])) { 237 $args[2] = []; 238 } 239 240 if (! is_array($args[2])) { 241 throw InvalidArgumentException::invalidType(sprintf('$operations[%d]["%s"][2]', $i, $type), $args[2], 'array'); 242 } 243 244 $args[2]['multi'] = ($type === self::UPDATE_MANY); 245 $args[2] += ['upsert' => false]; 246 247 if (isset($args[2]['arrayFilters']) && ! is_array($args[2]['arrayFilters'])) { 248 throw InvalidArgumentException::invalidType(sprintf('$operations[%d]["%s"][2]["arrayFilters"]', $i, $type), $args[2]['arrayFilters'], 'array'); 249 } 250 251 if (isset($args[2]['collation']) && ! is_array($args[2]['collation']) && ! is_object($args[2]['collation'])) { 252 throw InvalidArgumentException::invalidType(sprintf('$operations[%d]["%s"][2]["collation"]', $i, $type), $args[2]['collation'], 'array or object'); 253 } 254 255 if (! is_bool($args[2]['upsert'])) { 256 throw InvalidArgumentException::invalidType(sprintf('$operations[%d]["%s"][2]["upsert"]', $i, $type), $args[2]['upsert'], 'boolean'); 257 } 258 259 $operations[$i][$type][2] = $args[2]; 260 261 break; 262 263 default: 264 throw new InvalidArgumentException(sprintf('Unknown operation type "%s" in $operations[%d]', $type, $i)); 265 } 266 267 $expectedIndex += 1; 268 } 269 270 $options += ['ordered' => true]; 271 272 if (isset($options['bypassDocumentValidation']) && ! is_bool($options['bypassDocumentValidation'])) { 273 throw InvalidArgumentException::invalidType('"bypassDocumentValidation" option', $options['bypassDocumentValidation'], 'boolean'); 274 } 275 276 if (! is_bool($options['ordered'])) { 277 throw InvalidArgumentException::invalidType('"ordered" option', $options['ordered'], 'boolean'); 278 } 279 280 if (isset($options['session']) && ! $options['session'] instanceof Session) { 281 throw InvalidArgumentException::invalidType('"session" option', $options['session'], Session::class); 282 } 283 284 if (isset($options['writeConcern']) && ! $options['writeConcern'] instanceof WriteConcern) { 285 throw InvalidArgumentException::invalidType('"writeConcern" option', $options['writeConcern'], WriteConcern::class); 286 } 287 288 if (isset($options['let']) && ! is_array($options['let']) && ! is_object($options['let'])) { 289 throw InvalidArgumentException::invalidType('"let" option', $options['let'], 'array or object'); 290 } 291 292 if (isset($options['bypassDocumentValidation']) && ! $options['bypassDocumentValidation']) { 293 unset($options['bypassDocumentValidation']); 294 } 295 296 if (isset($options['writeConcern']) && $options['writeConcern']->isDefault()) { 297 unset($options['writeConcern']); 298 } 299 300 $this->databaseName = $databaseName; 301 $this->collectionName = $collectionName; 302 $this->operations = $operations; 303 $this->options = $options; 304 } 305 306 /** 307 * Execute the operation. 308 * 309 * @see Executable::execute() 310 * @return BulkWriteResult 311 * @throws UnsupportedException if write concern is used and unsupported 312 * @throws DriverRuntimeException for other driver errors (e.g. connection errors) 313 */ 314 public function execute(Server $server) 315 { 316 $inTransaction = isset($this->options['session']) && $this->options['session']->isInTransaction(); 317 if ($inTransaction && isset($this->options['writeConcern'])) { 318 throw UnsupportedException::writeConcernNotSupportedInTransaction(); 319 } 320 321 $bulk = new Bulk($this->createBulkWriteOptions()); 322 $insertedIds = []; 323 324 foreach ($this->operations as $i => $operation) { 325 $type = key($operation); 326 $args = current($operation); 327 328 switch ($type) { 329 case self::DELETE_MANY: 330 case self::DELETE_ONE: 331 $bulk->delete($args[0], $args[1]); 332 break; 333 334 case self::INSERT_ONE: 335 $insertedIds[$i] = $bulk->insert($args[0]); 336 break; 337 338 case self::REPLACE_ONE: 339 case self::UPDATE_MANY: 340 case self::UPDATE_ONE: 341 $bulk->update($args[0], $args[1], $args[2]); 342 } 343 } 344 345 $writeResult = $server->executeBulkWrite($this->databaseName . '.' . $this->collectionName, $bulk, $this->createExecuteOptions()); 346 347 return new BulkWriteResult($writeResult, $insertedIds); 348 } 349 350 /** 351 * Create options for constructing the bulk write. 352 * 353 * @see https://php.net/manual/en/mongodb-driver-bulkwrite.construct.php 354 */ 355 private function createBulkWriteOptions(): array 356 { 357 $options = ['ordered' => $this->options['ordered']]; 358 359 foreach (['bypassDocumentValidation', 'comment'] as $option) { 360 if (isset($this->options[$option])) { 361 $options[$option] = $this->options[$option]; 362 } 363 } 364 365 if (isset($this->options['let'])) { 366 $options['let'] = (object) $this->options['let']; 367 } 368 369 return $options; 370 } 371 372 /** 373 * Create options for executing the bulk write. 374 * 375 * @see https://php.net/manual/en/mongodb-driver-server.executebulkwrite.php 376 */ 377 private function createExecuteOptions(): array 378 { 379 $options = []; 380 381 if (isset($this->options['session'])) { 382 $options['session'] = $this->options['session']; 383 } 384 385 if (isset($this->options['writeConcern'])) { 386 $options['writeConcern'] = $this->options['writeConcern']; 387 } 388 389 return $options; 390 } 391 }
title
Description
Body
title
Description
Body
title
Description
Body
title
Body