<?php
/*
< * Copyright 2017 MongoDB, Inc.
> * Copyright 2017-present MongoDB, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
< * http://www.apache.org/licenses/LICENSE-2.0
> * https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace MongoDB\Operation;
use MongoDB\BSON\TimestampInterface;
use MongoDB\ChangeStream;
use MongoDB\Driver\Cursor;
use MongoDB\Driver\Exception\RuntimeException;
use MongoDB\Driver\Manager;
use MongoDB\Driver\Monitoring\CommandFailedEvent;
use MongoDB\Driver\Monitoring\CommandStartedEvent;
use MongoDB\Driver\Monitoring\CommandSubscriber;
use MongoDB\Driver\Monitoring\CommandSucceededEvent;
use MongoDB\Driver\ReadPreference;
use MongoDB\Driver\Server;
use MongoDB\Exception\InvalidArgumentException;
use MongoDB\Exception\UnexpectedValueException;
use MongoDB\Exception\UnsupportedException;
use MongoDB\Model\ChangeStreamIterator;
>
use function array_intersect_key;
> use function array_key_exists;
use function array_unshift;
> use function assert;
use function count;
use function is_array;
> use function is_bool;
use function is_object;
use function is_string;
use function MongoDB\Driver\Monitoring\addSubscriber;
use function MongoDB\Driver\Monitoring\removeSubscriber;
use function MongoDB\select_server;
use function MongoDB\server_supports_feature;
/**
* Operation for creating a change stream with the aggregate command.
*
* Note: the implementation of CommandSubscriber is an internal implementation
* detail and should not be considered part of the public API.
*
* @api
* @see \MongoDB\Collection::watch()
< * @see https://docs.mongodb.com/manual/changeStreams/
> * @see https://mongodb.com/docs/manual/changeStreams/
*/
class Watch implements Executable, /* @internal */ CommandSubscriber
{
< const FULL_DOCUMENT_DEFAULT = 'default';
< const FULL_DOCUMENT_UPDATE_LOOKUP = 'updateLookup';
> public const FULL_DOCUMENT_DEFAULT = 'default';
> public const FULL_DOCUMENT_UPDATE_LOOKUP = 'updateLookup';
> public const FULL_DOCUMENT_WHEN_AVAILABLE = 'whenAvailable';
> public const FULL_DOCUMENT_REQUIRED = 'required';
>
> public const FULL_DOCUMENT_BEFORE_CHANGE_OFF = 'off';
> public const FULL_DOCUMENT_BEFORE_CHANGE_WHEN_AVAILABLE = 'whenAvailable';
> public const FULL_DOCUMENT_BEFORE_CHANGE_REQUIRED = 'required';
/** @var integer */
private static $wireVersionForStartAtOperationTime = 7;
/** @var Aggregate */
private $aggregate;
/** @var array */
private $aggregateOptions;
/** @var array */
private $changeStreamOptions;
/** @var string|null */
private $collectionName;
/** @var string */
private $databaseName;
< /** @var integer|null */
> /** @var integer */
private $firstBatchSize;
/** @var boolean */
private $hasResumed = false;
/** @var Manager */
private $manager;
/** @var TimestampInterface */
private $operationTime;
/** @var array */
private $pipeline;
/** @var object|null */
private $postBatchResumeToken;
/**
* Constructs an aggregate command for creating a change stream.
*
* Supported options:
*
* * batchSize (integer): The number of documents to return per batch.
*
* * collation (document): Specifies a collation.
*
< * * fullDocument (string): Determines whether the "fullDocument" field
< * will be populated for update operations. By default, change streams
< * only return the delta of fields during the update operation (via the
< * "updateDescription" field). To additionally return the most current
< * majority-committed version of the updated document, specify
< * "updateLookup" for this option. Defaults to "default".
> * * comment (mixed): BSON value to attach as a comment to this command.
> *
> * Only string values are supported for server versions < 4.4.
> *
> * * fullDocument (string): Determines how the "fullDocument" response
> * field will be populated for update operations.
*
< * Insert and replace operations always include the "fullDocument" field
< * and delete operations omit the field as the document no longer exists.
> * By default, change streams only return the delta of fields (via an
> * "updateDescription" field) for update operations and "fullDocument" is
> * omitted. Insert and replace operations always include the
> * "fullDocument" field. Delete operations omit the field as the document
> * no longer exists.
> *
> * Specify "updateLookup" to return the current majority-committed
> * version of the updated document.
> *
> * MongoDB 6.0+ allows returning the post-image of the modified document
> * if the collection has changeStreamPreAndPostImages enabled. Specify
> * "whenAvailable" to return the post-image if available or a null value
> * if not. Specify "required" to return the post-image if available or
> * raise an error if not.
> *
> * * fullDocumentBeforeChange (string): Determines how the
> * "fullDocumentBeforeChange" response field will be populated. By
> * default, the field is omitted.
> *
> * MongoDB 6.0+ allows returning the pre-image of the modified document
> * if the collection has changeStreamPreAndPostImages enabled. Specify
> * "whenAvailable" to return the pre-image if available or a null value
> * if not. Specify "required" to return the pre-image if available or
> * raise an error if not.
*
* * maxAwaitTimeMS (integer): The maximum amount of time for the server to
* wait on new documents to satisfy a change stream query.
*
* * readConcern (MongoDB\Driver\ReadConcern): Read concern.
*
* * readPreference (MongoDB\Driver\ReadPreference): Read preference. This
* will be used to select a new server when resuming. Defaults to a
* "primary" read preference.
*
* * resumeAfter (document): Specifies the logical starting point for the
* new change stream.
*
* Using this option in conjunction with "startAfter" and/or
* "startAtOperationTime" will result in a server error. The options are
* mutually exclusive.
*
* * session (MongoDB\Driver\Session): Client session.
*
< * Sessions are not supported for server versions < 3.6.
> * * showExpandedEvents (boolean): Enables the server to send the expanded
> * list of change stream events.
> *
> * This option is not supported for server versions < 6.0.
*
* * startAfter (document): Specifies the logical starting point for the
* new change stream. Unlike "resumeAfter", this option can be used with
* a resume token from an "invalidate" event.
*
* Using this option in conjunction with "resumeAfter" and/or
* "startAtOperationTime" will result in a server error. The options are
* mutually exclusive.
*
* * startAtOperationTime (MongoDB\BSON\TimestampInterface): If specified,
* the change stream will only provide changes that occurred at or after
* the specified timestamp. Any command run against the server will
* return an operation time that can be used here. Alternatively, an
* operation time may be obtained from MongoDB\Driver\Server::getInfo().
*
* Using this option in conjunction with "resumeAfter" and/or
* "startAfter" will result in a server error. The options are mutually
* exclusive.
*
* This option is not supported for server versions < 4.0.
*
* * typeMap (array): Type map for BSON deserialization. This will be
* applied to the returned Cursor (it is not sent to the server).
*
* Note: A database-level change stream may be created by specifying null
* for the collection name. A cluster-level change stream may be created by
* specifying null for both the database and collection name.
*
* @param Manager $manager Manager instance from the driver
* @param string|null $databaseName Database name
* @param string|null $collectionName Collection name
* @param array $pipeline List of pipeline operations
* @param array $options Command options
* @throws InvalidArgumentException for parameter/option parsing errors
*/
< public function __construct(Manager $manager, $databaseName, $collectionName, array $pipeline, array $options = [])
> public function __construct(Manager $manager, ?string $databaseName, ?string $collectionName, array $pipeline, array $options = [])
{
if (isset($collectionName) && ! isset($databaseName)) {
throw new InvalidArgumentException('$collectionName should also be null if $databaseName is null');
}
$options += [
< 'fullDocument' => self::FULL_DOCUMENT_DEFAULT,
'readPreference' => new ReadPreference(ReadPreference::RP_PRIMARY),
];
< if (isset($options['fullDocument']) && ! is_string($options['fullDocument'])) {
> if (array_key_exists('fullDocument', $options) && ! is_string($options['fullDocument'])) {
throw InvalidArgumentException::invalidType('"fullDocument" option', $options['fullDocument'], 'string');
}
> if (isset($options['fullDocumentBeforeChange']) && ! is_string($options['fullDocumentBeforeChange'])) {
if (isset($options['resumeAfter']) && ! is_array($options['resumeAfter']) && ! is_object($options['resumeAfter'])) {
> throw InvalidArgumentException::invalidType('"fullDocumentBeforeChange" option', $options['fullDocumentBeforeChange'], 'string');
throw InvalidArgumentException::invalidType('"resumeAfter" option', $options['resumeAfter'], 'array or object');
> }
}
>
> if (! $options['readPreference'] instanceof ReadPreference) {
if (isset($options['startAfter']) && ! is_array($options['startAfter']) && ! is_object($options['startAfter'])) {
> throw InvalidArgumentException::invalidType('"readPreference" option', $options['readPreference'], ReadPreference::class);
throw InvalidArgumentException::invalidType('"startAfter" option', $options['startAfter'], 'array or object');
> }
}
>
if (isset($options['startAtOperationTime']) && ! $options['startAtOperationTime'] instanceof TimestampInterface) {
throw InvalidArgumentException::invalidType('"startAtOperationTime" option', $options['startAtOperationTime'], TimestampInterface::class);
}
> if (isset($options['showExpandedEvents']) && ! is_bool($options['showExpandedEvents'])) {
/* In the absence of an explicit session, create one to ensure that the
> throw InvalidArgumentException::invalidType('"showExpandedEvents" option', $options['showExpandedEvents'], 'bool');
* initial aggregation and any resume attempts can use the same session
> }
* ("implicit from the user's perspective" per PHPLIB-342). Since this
>
* is filling in for an implicit session, we default "causalConsistency"
* to false. */
if (! isset($options['session'])) {
try {
$options['session'] = $manager->startSession(['causalConsistency' => false]);
} catch (RuntimeException $e) {
/* We can ignore the exception, as libmongoc likely cannot
* create its own session and there is no risk of a mismatch. */
}
}
< $this->aggregateOptions = array_intersect_key($options, ['batchSize' => 1, 'collation' => 1, 'maxAwaitTimeMS' => 1, 'readConcern' => 1, 'readPreference' => 1, 'session' => 1, 'typeMap' => 1]);
< $this->changeStreamOptions = array_intersect_key($options, ['fullDocument' => 1, 'resumeAfter' => 1, 'startAfter' => 1, 'startAtOperationTime' => 1]);
> $this->aggregateOptions = array_intersect_key($options, ['batchSize' => 1, 'collation' => 1, 'comment' => 1, 'maxAwaitTimeMS' => 1, 'readConcern' => 1, 'readPreference' => 1, 'session' => 1, 'typeMap' => 1]);
> $this->changeStreamOptions = array_intersect_key($options, ['fullDocument' => 1, 'fullDocumentBeforeChange' => 1, 'resumeAfter' => 1, 'showExpandedEvents' => 1, 'startAfter' => 1, 'startAtOperationTime' => 1]);
// Null database name implies a cluster-wide change stream
if ($databaseName === null) {
$databaseName = 'admin';
$this->changeStreamOptions['allChangesForCluster'] = true;
}
$this->manager = $manager;
< $this->databaseName = (string) $databaseName;
< $this->collectionName = isset($collectionName) ? (string) $collectionName : null;
> $this->databaseName = $databaseName;
> $this->collectionName = $collectionName;
$this->pipeline = $pipeline;
$this->aggregate = $this->createAggregate();
}
/** @internal */
< final public function commandFailed(CommandFailedEvent $event)
> final public function commandFailed(CommandFailedEvent $event): void
{
}
/** @internal */
< final public function commandStarted(CommandStartedEvent $event)
> final public function commandStarted(CommandStartedEvent $event): void
{
if ($event->getCommandName() !== 'aggregate') {
return;
}
< $this->firstBatchSize = null;
> $this->firstBatchSize = 0;
$this->postBatchResumeToken = null;
}
/** @internal */
< final public function commandSucceeded(CommandSucceededEvent $event)
> final public function commandSucceeded(CommandSucceededEvent $event): void
{
if ($event->getCommandName() !== 'aggregate') {
return;
}
$reply = $event->getReply();
if (! isset($reply->cursor->firstBatch) || ! is_array($reply->cursor->firstBatch)) {
throw new UnexpectedValueException('aggregate command did not return a "cursor.firstBatch" array');
}
$this->firstBatchSize = count($reply->cursor->firstBatch);
if (isset($reply->cursor->postBatchResumeToken) && is_object($reply->cursor->postBatchResumeToken)) {
$this->postBatchResumeToken = $reply->cursor->postBatchResumeToken;
}
< if ($this->shouldCaptureOperationTime($event->getServer()) &&
< isset($reply->operationTime) && $reply->operationTime instanceof TimestampInterface) {
> if (
> $this->shouldCaptureOperationTime($event->getServer()) &&
> isset($reply->operationTime) && $reply->operationTime instanceof TimestampInterface
> ) {
$this->operationTime = $reply->operationTime;
}
}
/**
* Execute the operation.
*
* @see Executable::execute()
< * @param Server $server
* @return ChangeStream
* @throws UnsupportedException if collation or read concern is used and unsupported
* @throws RuntimeException for other driver errors (e.g. connection errors)
*/
public function execute(Server $server)
{
return new ChangeStream(
$this->createChangeStreamIterator($server),
< function ($resumeToken, $hasAdvanced) {
> function ($resumeToken, $hasAdvanced): ChangeStreamIterator {
return $this->resume($resumeToken, $hasAdvanced);
}
);
}
/**
* Create the aggregate command for a change stream.
*
* This method is also used to recreate the aggregate command when resuming.
< *
< * @return Aggregate
*/
< private function createAggregate()
> private function createAggregate(): Aggregate
{
$pipeline = $this->pipeline;
array_unshift($pipeline, ['$changeStream' => (object) $this->changeStreamOptions]);
return new Aggregate($this->databaseName, $this->collectionName, $pipeline, $this->aggregateOptions);
}
/**
* Create a ChangeStreamIterator by executing the aggregate command.
< *
< * @param Server $server
< * @return ChangeStreamIterator
*/
< private function createChangeStreamIterator(Server $server)
> private function createChangeStreamIterator(Server $server): ChangeStreamIterator
{
return new ChangeStreamIterator(
$this->executeAggregate($server),
$this->firstBatchSize,
$this->getInitialResumeToken(),
$this->postBatchResumeToken
);
}
/**
* Execute the aggregate command.
*
* The command will be executed using APM so that we can capture data from
* its response (e.g. firstBatch size, postBatchResumeToken).
< *
< * @param Server $server
< * @return Cursor
*/
< private function executeAggregate(Server $server)
> private function executeAggregate(Server $server): Cursor
{
addSubscriber($this);
try {
< return $this->aggregate->execute($server);
> $cursor = $this->aggregate->execute($server);
> assert($cursor instanceof Cursor);
>
> return $cursor;
} finally {
removeSubscriber($this);
}
}
/**
* Return the initial resume token for creating the ChangeStreamIterator.
*
* @see https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.rst#updating-the-cached-resume-token
* @return array|object|null
*/
private function getInitialResumeToken()
{
if ($this->firstBatchSize === 0 && isset($this->postBatchResumeToken)) {
return $this->postBatchResumeToken;
}
if (isset($this->changeStreamOptions['startAfter'])) {
return $this->changeStreamOptions['startAfter'];
}
if (isset($this->changeStreamOptions['resumeAfter'])) {
return $this->changeStreamOptions['resumeAfter'];
}
return null;
}
/**
* Resumes a change stream.
*
* @see https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.rst#resume-process
* @param array|object|null $resumeToken
< * @param bool $hasAdvanced
< * @return ChangeStreamIterator
* @throws InvalidArgumentException
*/
< private function resume($resumeToken = null, $hasAdvanced = false)
> private function resume($resumeToken = null, bool $hasAdvanced = false): ChangeStreamIterator
{
if (isset($resumeToken) && ! is_array($resumeToken) && ! is_object($resumeToken)) {
throw InvalidArgumentException::invalidType('$resumeToken', $resumeToken, 'array or object');
}
$this->hasResumed = true;
/* Select a new server using the original read preference. While watch
* is not usable within transactions, we still check if there is a
* pinned session. This is to avoid an ambiguous error message about
* running a command on the wrong server. */
$server = select_server($this->manager, $this->aggregateOptions);
$resumeOption = isset($this->changeStreamOptions['startAfter']) && ! $hasAdvanced ? 'startAfter' : 'resumeAfter';
unset($this->changeStreamOptions['resumeAfter']);
unset($this->changeStreamOptions['startAfter']);
unset($this->changeStreamOptions['startAtOperationTime']);
if ($resumeToken !== null) {
$this->changeStreamOptions[$resumeOption] = $resumeToken;
}
if ($resumeToken === null && $this->operationTime !== null) {
$this->changeStreamOptions['startAtOperationTime'] = $this->operationTime;
}
// Recreate the aggregate command and return a new ChangeStreamIterator
$this->aggregate = $this->createAggregate();
return $this->createChangeStreamIterator($server);
}
/**
* Determine whether to capture operation time from an aggregate response.
*
* @see https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.rst#startatoperationtime
< * @param Server $server
< * @return boolean
*/
< private function shouldCaptureOperationTime(Server $server)
> private function shouldCaptureOperationTime(Server $server): bool
{
if ($this->hasResumed) {
return false;
}
< if (isset($this->changeStreamOptions['resumeAfter']) ||
> if (
> isset($this->changeStreamOptions['resumeAfter']) ||
isset($this->changeStreamOptions['startAfter']) ||
< isset($this->changeStreamOptions['startAtOperationTime'])) {
> isset($this->changeStreamOptions['startAtOperationTime'])
> ) {
return false;
}
if ($this->firstBatchSize > 0) {
return false;
}
if ($this->postBatchResumeToken !== null) {
return false;
}
if (! server_supports_feature($server, self::$wireVersionForStartAtOperationTime)) {
return false;
}
return true;
}
}