Completed
Push — master ( 2b1385...7c6a84 )
by Thomas
07:21
created

Pool::addNextRequest()   D

Complexity

Conditions 9
Paths 0

Size

Total Lines 52
Code Lines 31

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 52
rs 4.6098
cc 9
eloc 31
nc 0
nop 0

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
namespace GuzzleHttp;
3
4
use GuzzleHttp\Event\BeforeEvent;
5
use GuzzleHttp\Event\RequestEvents;
6
use GuzzleHttp\Message\RequestInterface;
7
use GuzzleHttp\Message\ResponseInterface;
8
use GuzzleHttp\Ring\Core;
9
use GuzzleHttp\Ring\Future\FutureInterface;
10
use GuzzleHttp\Event\ListenerAttacherTrait;
11
use GuzzleHttp\Event\EndEvent;
12
use React\Promise\Deferred;
13
use React\Promise\FulfilledPromise;
14
use React\Promise\PromiseInterface;
15
use React\Promise\RejectedPromise;
16
17
/**
18
 * Sends and iterator of requests concurrently using a capped pool size.
19
 *
20
 * The Pool object implements FutureInterface, meaning it can be used later
21
 * when necessary, the requests provided to the pool can be cancelled, and
22
 * you can check the state of the pool to know if it has been dereferenced
23
 * (sent) or has been cancelled.
24
 *
25
 * When sending the pool, keep in mind that no results are returned: callers
26
 * are expected to handle results asynchronously using Guzzle's event system.
27
 * When requests complete, more are added to the pool to ensure that the
28
 * requested pool size is always filled as much as possible.
29
 *
30
 * IMPORTANT: Do not provide a pool size greater that what the utilized
31
 * underlying RingPHP handler can support. This will result is extremely poor
32
 * performance.
33
 */
34
class Pool implements FutureInterface
35
{
36
    use ListenerAttacherTrait;
37
38
    /** @var \GuzzleHttp\ClientInterface */
39
    private $client;
40
41
    /** @var \Iterator Yields requests */
42
    private $iter;
43
44
    /** @var Deferred */
45
    private $deferred;
46
47
    /** @var PromiseInterface */
48
    private $promise;
49
50
    private $waitQueue = [];
51
    private $eventListeners = [];
52
    private $poolSize;
53
    private $isRealized = false;
54
55
    /**
56
     * The option values for 'before', 'complete', 'error' and 'end' can be a
57
     * callable, an associative array containing event data, or an array of
58
     * event data arrays. Event data arrays contain the following keys:
59
     *
60
     * - fn: callable to invoke that receives the event
61
     * - priority: Optional event priority (defaults to 0)
62
     * - once: Set to true so that the event is removed after it is triggered
63
     *
64
     * @param ClientInterface $client   Client used to send the requests.
65
     * @param array|\Iterator $requests Requests to send in parallel
66
     * @param array           $options  Associative array of options
67
     *     - pool_size: (callable|int)   Maximum number of requests to send
68
     *                                   concurrently, or a callback that receives
69
     *                                   the current queue size and returns the
70
     *                                   number of new requests to send
71
     *     - before:    (callable|array) Receives a BeforeEvent
72
     *     - complete:  (callable|array) Receives a CompleteEvent
73
     *     - error:     (callable|array) Receives a ErrorEvent
74
     *     - end:       (callable|array) Receives an EndEvent
75
     */
76
    public function __construct(
77
        ClientInterface $client,
78
        $requests,
79
        array $options = []
80
    ) {
81
        $this->client = $client;
82
        $this->iter = $this->coerceIterable($requests);
83
        $this->deferred = new Deferred();
84
        $this->promise = $this->deferred->promise();
85
        $this->poolSize = isset($options['pool_size'])
86
            ? $options['pool_size'] : 25;
87
        $this->eventListeners = $this->prepareListeners(
88
            $options,
89
            ['before', 'complete', 'error', 'end']
90
        );
91
    }
92
93
    /**
94
     * Sends multiple requests in parallel and returns an array of responses
95
     * and exceptions that uses the same ordering as the provided requests.
96
     *
97
     * IMPORTANT: This method keeps every request and response in memory, and
98
     * as such, is NOT recommended when sending a large number or an
99
     * indeterminate number of requests concurrently.
100
     *
101
     * @param ClientInterface $client   Client used to send the requests
102
     * @param array|\Iterator $requests Requests to send in parallel
103
     * @param array           $options  Passes through the options available in
104
     *                                  {@see GuzzleHttp\Pool::__construct}
105
     *
106
     * @return BatchResults Returns a container for the results.
107
     * @throws \InvalidArgumentException if the event format is incorrect.
108
     */
109
    public static function batch(
110
        ClientInterface $client,
111
        $requests,
112
        array $options = []
113
    ) {
114
        $hash = new \SplObjectStorage();
115
        foreach ($requests as $request) {
116
            $hash->attach($request);
117
        }
118
119
        // In addition to the normally run events when requests complete, add
120
        // and event to continuously track the results of transfers in the hash.
121
        (new self($client, $requests, RequestEvents::convertEventArray(
122
            $options,
123
            ['end'],
124
            [
125
                'priority' => RequestEvents::LATE,
126
                'fn'       => function (EndEvent $e) use ($hash) {
127
                    $hash[$e->getRequest()] = $e->getException()
128
                        ? $e->getException()
129
                        : $e->getResponse();
130
                }
131
            ]
132
        )))->wait();
133
134
        return new BatchResults($hash);
135
    }
136
137
    /**
138
     * Creates a Pool and immediately sends the requests.
139
     *
140
     * @param ClientInterface $client   Client used to send the requests
141
     * @param array|\Iterator $requests Requests to send in parallel
142
     * @param array           $options  Passes through the options available in
143
     *                                  {@see GuzzleHttp\Pool::__construct}
144
     */
145
    public static function send(
146
        ClientInterface $client,
147
        $requests,
148
        array $options = []
149
    ) {
150
        $pool = new self($client, $requests, $options);
151
        $pool->wait();
152
    }
153
154
    private function getPoolSize()
155
    {
156
        return is_callable($this->poolSize)
157
            ? call_user_func($this->poolSize, count($this->waitQueue))
158
            : $this->poolSize;
159
    }
160
161
    /**
162
     * Add as many requests as possible up to the current pool limit.
163
     */
164
    private function addNextRequests()
165
    {
166
        $limit = max($this->getPoolSize() - count($this->waitQueue), 0);
167
        while ($limit--) {
168
            if (!$this->addNextRequest()) {
169
                break;
170
            }
171
        }
172
    }
173
174
    public function wait()
175
    {
176
        if ($this->isRealized) {
177
            return false;
178
        }
179
180
        // Seed the pool with N number of requests.
181
        $this->addNextRequests();
182
183
        // Stop if the pool was cancelled while transferring requests.
184
        if ($this->isRealized) {
185
            return false;
186
        }
187
188
        // Wait on any outstanding FutureResponse objects.
189
        while ($response = array_pop($this->waitQueue)) {
190
            try {
191
                $response->wait();
192
            } catch (\Exception $e) {
193
                // Eat exceptions because they should be handled asynchronously
194
            }
195
            $this->addNextRequests();
196
        }
197
198
        // Clean up no longer needed state.
199
        $this->isRealized = true;
200
        $this->waitQueue = $this->eventListeners = [];
201
        $this->client = $this->iter = null;
202
        $this->deferred->resolve(true);
203
204
        return true;
205
    }
206
207
    /**
208
     * {@inheritdoc}
209
     *
210
     * Attempt to cancel all outstanding requests (requests that are queued for
211
     * dereferencing). Returns true if all outstanding requests can be
212
     * cancelled.
213
     *
214
     * @return bool
215
     */
216
    public function cancel()
217
    {
218
        if ($this->isRealized) {
219
            return false;
220
        }
221
222
        $success = $this->isRealized = true;
223
        foreach ($this->waitQueue as $response) {
224
            if (!$response->cancel()) {
225
                $success = false;
226
            }
227
        }
228
229
        return $success;
230
    }
231
232
    /**
233
     * Returns a promise that is invoked when the pool completed. There will be
234
     * no passed value.
235
     *
236
     * {@inheritdoc}
237
     */
238
    public function then(
239
        callable $onFulfilled = null,
240
        callable $onRejected = null,
241
        callable $onProgress = null
242
    ) {
243
        return $this->promise->then($onFulfilled, $onRejected, $onProgress);
244
    }
245
246
    public function promise()
247
    {
248
        return $this->promise;
249
    }
250
251
    private function coerceIterable($requests)
252
    {
253
        if ($requests instanceof \Iterator) {
254
            return $requests;
255
        } elseif (is_array($requests)) {
256
            return new \ArrayIterator($requests);
257
        }
258
259
        throw new \InvalidArgumentException('Expected Iterator or array. '
260
            . 'Found ' . Core::describeType($requests));
261
    }
262
263
    /**
264
     * Adds the next request to pool and tracks what requests need to be
265
     * dereferenced when completing the pool.
266
     */
267
    private function addNextRequest()
268
    {
269
        add_next:
270
271
        if ($this->isRealized || !$this->iter || !$this->iter->valid()) {
272
            return false;
273
        }
274
275
        $request = $this->iter->current();
276
        $this->iter->next();
277
278
        if (!($request instanceof RequestInterface)) {
279
            throw new \InvalidArgumentException(sprintf(
280
                'All requests in the provided iterator must implement '
281
                . 'RequestInterface. Found %s',
282
                Core::describeType($request)
283
            ));
284
        }
285
286
        // Be sure to use "lazy" futures, meaning they do not send right away.
287
        $request->getConfig()->set('future', 'lazy');
288
        $hash = spl_object_hash($request);
289
        $this->attachListeners($request, $this->eventListeners);
290
        $request->getEmitter()->on('before', [$this, '_trackRetries'], RequestEvents::EARLY);
291
        $response = $this->client->send($request);
292
        $this->waitQueue[$hash] = $response;
293
        $promise = $response->promise();
294
295
        // Don't recursively call itself for completed or rejected responses.
296
        if ($promise instanceof FulfilledPromise
297
            || $promise instanceof RejectedPromise
298
        ) {
299
            try {
300
                $this->finishResponse($request, $response->wait(), $hash);
301
            } catch (\Exception $e) {
302
                $this->finishResponse($request, $e, $hash);
303
            }
304
            goto add_next;
305
        }
306
307
        // Use this function for both resolution and rejection.
308
        $thenFn = function ($value) use ($request, $hash) {
309
            $this->finishResponse($request, $value, $hash);
310
            if (!$request->getConfig()->get('_pool_retries')) {
311
                $this->addNextRequests();
312
            }
313
        };
314
315
        $promise->then($thenFn, $thenFn);
316
317
        return true;
318
    }
319
320
    public function _trackRetries(BeforeEvent $e)
321
    {
322
        $e->getRequest()->getConfig()->set('_pool_retries', $e->getRetryCount());
323
    }
324
325
    private function finishResponse($request, $value, $hash)
326
    {
327
        unset($this->waitQueue[$hash]);
328
        $result = $value instanceof ResponseInterface
329
            ? ['request' => $request, 'response' => $value, 'error' => null]
330
            : ['request' => $request, 'response' => null, 'error' => $value];
331
        $this->deferred->notify($result);
332
    }
333
}
334