Search moodle.org's
Developer Documentation

See Release Notes

  • Bug fixes for general core bugs in 4.2.x will end 22 April 2024 (12 months).
  • Bug fixes for security issues in 4.2.x will end 7 October 2024 (18 months).
  • PHP version: minimum PHP 8.0.0 Note: minimum PHP version has increased since Moodle 4.1. PHP 8.1.x is supported too.
   1  <?php
   2  
   3  namespace GuzzleHttp;
   4  
   5  use GuzzleHttp\Promise as P;
   6  use GuzzleHttp\Promise\EachPromise;
   7  use GuzzleHttp\Promise\PromiseInterface;
   8  use GuzzleHttp\Promise\PromisorInterface;
   9  use Psr\Http\Message\RequestInterface;
  10  
  11  /**
  12   * Sends an iterator of requests concurrently using a capped pool size.
  13   *
  14   * The pool will read from an iterator until it is cancelled or until the
  15   * iterator is consumed. When a request is yielded, the request is sent after
  16   * applying the "request_options" request options (if provided in the ctor).
  17   *
  18   * When a function is yielded by the iterator, the function is provided the
  19   * "request_options" array that should be merged on top of any existing
  20   * options, and the function MUST then return a wait-able promise.
  21   *
  22   * @final
  23   */
  24  class Pool implements PromisorInterface
  25  {
  26      /**
  27       * @var EachPromise
  28       */
  29      private $each;
  30  
  31      /**
  32       * @param ClientInterface $client   Client used to send the requests.
  33       * @param array|\Iterator $requests Requests or functions that return
  34       *                                  requests to send concurrently.
  35       * @param array           $config   Associative array of options
  36       *                                  - concurrency: (int) Maximum number of requests to send concurrently
  37       *                                  - options: Array of request options to apply to each request.
  38       *                                  - fulfilled: (callable) Function to invoke when a request completes.
  39       *                                  - rejected: (callable) Function to invoke when a request is rejected.
  40       */
  41      public function __construct(ClientInterface $client, $requests, array $config = [])
  42      {
  43          if (!isset($config['concurrency'])) {
  44              $config['concurrency'] = 25;
  45          }
  46  
  47          if (isset($config['options'])) {
  48              $opts = $config['options'];
  49              unset($config['options']);
  50          } else {
  51              $opts = [];
  52          }
  53  
  54          $iterable = P\Create::iterFor($requests);
  55          $requests = static function () use ($iterable, $client, $opts) {
  56              foreach ($iterable as $key => $rfn) {
  57                  if ($rfn instanceof RequestInterface) {
  58                      yield $key => $client->sendAsync($rfn, $opts);
  59                  } elseif (\is_callable($rfn)) {
  60                      yield $key => $rfn($opts);
  61                  } else {
  62                      throw new \InvalidArgumentException('Each value yielded by the iterator must be a Psr7\Http\Message\RequestInterface or a callable that returns a promise that fulfills with a Psr7\Message\Http\ResponseInterface object.');
  63                  }
  64              }
  65          };
  66  
  67          $this->each = new EachPromise($requests(), $config);
  68      }
  69  
  70      /**
  71       * Get promise
  72       */
  73      public function promise(): PromiseInterface
  74      {
  75          return $this->each->promise();
  76      }
  77  
  78      /**
  79       * Sends multiple requests concurrently and returns an array of responses
  80       * and exceptions that uses the same ordering as the provided requests.
  81       *
  82       * IMPORTANT: This method keeps every request and response in memory, and
  83       * as such, is NOT recommended when sending a large number or an
  84       * indeterminate number of requests concurrently.
  85       *
  86       * @param ClientInterface $client   Client used to send the requests
  87       * @param array|\Iterator $requests Requests to send concurrently.
  88       * @param array           $options  Passes through the options available in
  89       *                                  {@see \GuzzleHttp\Pool::__construct}
  90       *
  91       * @return array Returns an array containing the response or an exception
  92       *               in the same order that the requests were sent.
  93       *
  94       * @throws \InvalidArgumentException if the event format is incorrect.
  95       */
  96      public static function batch(ClientInterface $client, $requests, array $options = []): array
  97      {
  98          $res = [];
  99          self::cmpCallback($options, 'fulfilled', $res);
 100          self::cmpCallback($options, 'rejected', $res);
 101          $pool = new static($client, $requests, $options);
 102          $pool->promise()->wait();
 103          \ksort($res);
 104  
 105          return $res;
 106      }
 107  
 108      /**
 109       * Execute callback(s)
 110       */
 111      private static function cmpCallback(array &$options, string $name, array &$results): void
 112      {
 113          if (!isset($options[$name])) {
 114              $options[$name] = static function ($v, $k) use (&$results) {
 115                  $results[$k] = $v;
 116              };
 117          } else {
 118              $currentFn = $options[$name];
 119              $options[$name] = static function ($v, $k) use (&$results, $currentFn) {
 120                  $currentFn($v, $k);
 121                  $results[$k] = $v;
 122              };
 123          }
 124      }
 125  }