CommandPool.php 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. <?php
  2. namespace Aws;
  3. use GuzzleHttp\Promise\PromisorInterface;
  4. use GuzzleHttp\Promise\EachPromise;
  5. /**
  6. * Sends and iterator of commands concurrently using a capped pool size.
  7. *
  8. * The pool will read command objects from an iterator until it is cancelled or
  9. * until the iterator is consumed.
  10. */
  11. class CommandPool implements PromisorInterface
  12. {
  13. /** @var EachPromise */
  14. private $each;
  15. /**
  16. * The CommandPool constructor accepts a hash of configuration options:
  17. *
  18. * - concurrency: (callable|int) Maximum number of commands to execute
  19. * concurrently. Provide a function to resize the pool dynamically. The
  20. * function will be provided the current number of pending requests and
  21. * is expected to return an integer representing the new pool size limit.
  22. * - before: (callable) function to invoke before sending each command. The
  23. * before function accepts the command and the key of the iterator of the
  24. * command. You can mutate the command as needed in the before function
  25. * before sending the command.
  26. * - fulfilled: (callable) Function to invoke when a promise is fulfilled.
  27. * The function is provided the result object, id of the iterator that the
  28. * result came from, and the aggregate promise that can be resolved/rejected
  29. * if you need to short-circuit the pool.
  30. * - rejected: (callable) Function to invoke when a promise is rejected.
  31. * The function is provided an AwsException object, id of the iterator that
  32. * the exception came from, and the aggregate promise that can be
  33. * resolved/rejected if you need to short-circuit the pool.
  34. *
  35. * @param AwsClientInterface $client Client used to execute commands.
  36. * @param array|\Iterator $commands Iterable that yields commands.
  37. * @param array $config Associative array of options.
  38. */
  39. public function __construct(
  40. AwsClientInterface $client,
  41. $commands,
  42. array $config = []
  43. ) {
  44. if (!isset($config['concurrency'])) {
  45. $config['concurrency'] = 25;
  46. }
  47. $before = $this->getBefore($config);
  48. $mapFn = function ($commands) use ($client, $before) {
  49. foreach ($commands as $key => $command) {
  50. if (!($command instanceof CommandInterface)) {
  51. throw new \InvalidArgumentException('Each value yielded by '
  52. . 'the iterator must be an Aws\CommandInterface.');
  53. }
  54. if ($before) {
  55. $before($command, $key);
  56. }
  57. yield $client->executeAsync($command);
  58. }
  59. };
  60. $this->each = new EachPromise($mapFn($commands), $config);
  61. }
  62. /**
  63. * @return \GuzzleHttp\Promise\PromiseInterface
  64. */
  65. public function promise()
  66. {
  67. return $this->each->promise();
  68. }
  69. /**
  70. * Executes a pool synchronously and aggregates the results of the pool
  71. * into an indexed array in the same order as the passed in array.
  72. *
  73. * @param AwsClientInterface $client Client used to execute commands.
  74. * @param mixed $commands Iterable that yields commands.
  75. * @param array $config Configuration options.
  76. *
  77. * @return array
  78. * @see \Aws\CommandPool::__construct for available configuration options.
  79. */
  80. public static function batch(
  81. AwsClientInterface $client,
  82. $commands,
  83. array $config = []
  84. ) {
  85. $results = [];
  86. self::cmpCallback($config, 'fulfilled', $results);
  87. self::cmpCallback($config, 'rejected', $results);
  88. return (new self($client, $commands, $config))
  89. ->promise()
  90. ->then(static function () use (&$results) {
  91. ksort($results);
  92. return $results;
  93. })
  94. ->wait();
  95. }
  96. /**
  97. * @return callable
  98. */
  99. private function getBefore(array $config)
  100. {
  101. if (!isset($config['before'])) {
  102. return null;
  103. }
  104. if (is_callable($config['before'])) {
  105. return $config['before'];
  106. }
  107. throw new \InvalidArgumentException('before must be callable');
  108. }
  109. /**
  110. * Adds an onFulfilled or onRejected callback that aggregates results into
  111. * an array. If a callback is already present, it is replaced with the
  112. * composed function.
  113. *
  114. * @param array $config
  115. * @param $name
  116. * @param array $results
  117. */
  118. private static function cmpCallback(array &$config, $name, array &$results)
  119. {
  120. if (!isset($config[$name])) {
  121. $config[$name] = function ($v, $k) use (&$results) {
  122. $results[$k] = $v;
  123. };
  124. } else {
  125. $currentFn = $config[$name];
  126. $config[$name] = function ($v, $k) use (&$results, $currentFn) {
  127. $currentFn($v, $k);
  128. $results[$k] = $v;
  129. };
  130. }
  131. }
  132. }