1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace Guzzle\Http\Curl; |
4
|
|
|
|
5
|
|
|
use Guzzle\Common\AbstractHasDispatcher; |
6
|
|
|
use Guzzle\Common\Event; |
7
|
|
|
use Guzzle\Http\Exception\MultiTransferException; |
8
|
|
|
use Guzzle\Http\Exception\CurlException; |
9
|
|
|
use Guzzle\Http\Message\RequestInterface; |
10
|
|
|
use Guzzle\Http\Message\EntityEnclosingRequestInterface; |
11
|
|
|
use Guzzle\Http\Exception\RequestException; |
12
|
|
|
use Symfony\Component\EventDispatcher\EventDispatcherInterface; |
13
|
|
|
|
14
|
|
|
/** |
15
|
|
|
* Send {@see RequestInterface} objects in parallel using curl_multi |
16
|
|
|
*/ |
17
|
|
|
class CurlMulti extends AbstractHasDispatcher implements CurlMultiInterface |
18
|
|
|
{ |
19
|
|
|
/** @var resource cURL multi handle. */ |
20
|
|
|
protected $multiHandle; |
21
|
|
|
|
22
|
|
|
/** @var array Attached {@see RequestInterface} objects. */ |
23
|
|
|
protected $requests; |
24
|
|
|
|
25
|
|
|
/** @var \SplObjectStorage RequestInterface to CurlHandle hash */ |
26
|
|
|
protected $handles; |
27
|
|
|
|
28
|
|
|
/** @var array Hash mapping curl handle resource IDs to request objects */ |
29
|
|
|
protected $resourceHash; |
30
|
|
|
|
31
|
|
|
/** @var array Queued exceptions */ |
32
|
|
|
protected $exceptions = array(); |
33
|
|
|
|
34
|
|
|
/** @var array Requests that succeeded */ |
35
|
|
|
protected $successful = array(); |
36
|
|
|
|
37
|
|
|
/** @var array cURL multi error values and codes */ |
38
|
|
|
protected $multiErrors = array( |
39
|
|
|
CURLM_BAD_HANDLE => array('CURLM_BAD_HANDLE', 'The passed-in handle is not a valid CURLM handle.'), |
40
|
|
|
CURLM_BAD_EASY_HANDLE => array('CURLM_BAD_EASY_HANDLE', "An easy handle was not good/valid. It could mean that it isn't an easy handle at all, or possibly that the handle already is in used by this or another multi handle."), |
41
|
|
|
CURLM_OUT_OF_MEMORY => array('CURLM_OUT_OF_MEMORY', 'You are doomed.'), |
42
|
|
|
CURLM_INTERNAL_ERROR => array('CURLM_INTERNAL_ERROR', 'This can only be returned if libcurl bugs. Please report it to us!') |
43
|
|
|
); |
44
|
|
|
|
45
|
|
|
/** @var float */ |
46
|
|
|
protected $selectTimeout; |
47
|
|
|
|
48
|
|
|
public function __construct($selectTimeout = 1.0) |
49
|
|
|
{ |
50
|
|
|
$this->selectTimeout = $selectTimeout; |
51
|
|
|
$this->multiHandle = curl_multi_init(); |
52
|
|
|
// @codeCoverageIgnoreStart |
53
|
|
|
if ($this->multiHandle === false) { |
54
|
|
|
throw new CurlException('Unable to create multi handle'); |
55
|
|
|
} |
56
|
|
|
// @codeCoverageIgnoreEnd |
57
|
|
|
$this->reset(); |
58
|
|
|
} |
59
|
|
|
|
60
|
|
|
public function __destruct() |
61
|
|
|
{ |
62
|
|
|
if (is_resource($this->multiHandle)) { |
63
|
|
|
curl_multi_close($this->multiHandle); |
64
|
|
|
} |
65
|
|
|
} |
66
|
|
|
|
67
|
|
|
public function add(RequestInterface $request) |
68
|
|
|
{ |
69
|
|
|
$this->requests[] = $request; |
70
|
|
|
// If requests are currently transferring and this is async, then the |
71
|
|
|
// request must be prepared now as the send() method is not called. |
72
|
|
|
$this->beforeSend($request); |
73
|
|
|
$this->dispatch(self::ADD_REQUEST, array('request' => $request)); |
74
|
|
|
|
75
|
|
|
return $this; |
76
|
|
|
} |
77
|
|
|
|
78
|
|
|
public function all() |
79
|
|
|
{ |
80
|
|
|
return $this->requests; |
81
|
|
|
} |
82
|
|
|
|
83
|
|
|
public function remove(RequestInterface $request) |
84
|
|
|
{ |
85
|
|
|
$this->removeHandle($request); |
86
|
|
|
if (($index = array_search($request, $this->requests, true)) !== false) { |
87
|
|
|
$request = $this->requests[$index]; |
88
|
|
|
unset($this->requests[$index]); |
89
|
|
|
$this->requests = array_values($this->requests); |
90
|
|
|
$this->dispatch(self::REMOVE_REQUEST, array('request' => $request)); |
91
|
|
|
return true; |
92
|
|
|
} |
93
|
|
|
|
94
|
|
|
return false; |
95
|
|
|
} |
96
|
|
|
|
97
|
|
|
public function reset($hard = false) |
98
|
|
|
{ |
99
|
|
|
// Remove each request |
100
|
|
|
if ($this->requests) { |
|
|
|
|
101
|
|
|
foreach ($this->requests as $request) { |
102
|
|
|
$this->remove($request); |
103
|
|
|
} |
104
|
|
|
} |
105
|
|
|
|
106
|
|
|
$this->handles = new \SplObjectStorage(); |
107
|
|
|
$this->requests = $this->resourceHash = $this->exceptions = $this->successful = array(); |
108
|
|
|
} |
109
|
|
|
|
110
|
|
|
public function send() |
111
|
|
|
{ |
112
|
|
|
$this->perform(); |
113
|
|
|
$exceptions = $this->exceptions; |
114
|
|
|
$successful = $this->successful; |
115
|
|
|
$this->reset(); |
116
|
|
|
|
117
|
|
|
if ($exceptions) { |
|
|
|
|
118
|
|
|
$this->throwMultiException($exceptions, $successful); |
119
|
|
|
} |
120
|
|
|
} |
121
|
|
|
|
122
|
|
|
public function count() |
123
|
|
|
{ |
124
|
|
|
return count($this->requests); |
125
|
|
|
} |
126
|
|
|
|
127
|
|
|
/** |
128
|
|
|
* Build and throw a MultiTransferException |
129
|
|
|
* |
130
|
|
|
* @param array $exceptions Exceptions encountered |
131
|
|
|
* @param array $successful Successful requests |
132
|
|
|
* @throws MultiTransferException |
133
|
|
|
*/ |
134
|
|
|
protected function throwMultiException(array $exceptions, array $successful) |
135
|
|
|
{ |
136
|
|
|
$multiException = new MultiTransferException('Errors during multi transfer'); |
137
|
|
|
|
138
|
|
|
while ($e = array_shift($exceptions)) { |
139
|
|
|
$multiException->addFailedRequestWithException($e['request'], $e['exception']); |
140
|
|
|
} |
141
|
|
|
|
142
|
|
|
// Add successful requests |
143
|
|
|
foreach ($successful as $request) { |
144
|
|
|
if (!$multiException->containsRequest($request)) { |
145
|
|
|
$multiException->addSuccessfulRequest($request); |
146
|
|
|
} |
147
|
|
|
} |
148
|
|
|
|
149
|
|
|
throw $multiException; |
150
|
|
|
} |
151
|
|
|
|
152
|
|
|
/** |
153
|
|
|
* Prepare for sending |
154
|
|
|
* |
155
|
|
|
* @param RequestInterface $request Request to prepare |
156
|
|
|
* @throws \Exception on error preparing the request |
157
|
|
|
*/ |
158
|
|
|
protected function beforeSend(RequestInterface $request) |
159
|
|
|
{ |
160
|
|
|
try { |
161
|
|
|
$state = $request->setState(RequestInterface::STATE_TRANSFER); |
162
|
|
|
if ($state == RequestInterface::STATE_TRANSFER) { |
163
|
|
|
$this->addHandle($request); |
164
|
|
|
} else { |
165
|
|
|
// Requests might decide they don't need to be sent just before |
166
|
|
|
// transfer (e.g. CachePlugin) |
167
|
|
|
$this->remove($request); |
168
|
|
|
if ($state == RequestInterface::STATE_COMPLETE) { |
169
|
|
|
$this->successful[] = $request; |
170
|
|
|
} |
171
|
|
|
} |
172
|
|
|
} catch (\Exception $e) { |
173
|
|
|
// Queue the exception to be thrown when sent |
174
|
|
|
$this->removeErroredRequest($request, $e); |
175
|
|
|
} |
176
|
|
|
} |
177
|
|
|
|
178
|
|
|
private function addHandle(RequestInterface $request) |
179
|
|
|
{ |
180
|
|
|
$handle = $this->createCurlHandle($request)->getHandle(); |
181
|
|
|
$this->checkCurlResult( |
182
|
|
|
curl_multi_add_handle($this->multiHandle, $handle) |
183
|
|
|
); |
184
|
|
|
} |
185
|
|
|
|
186
|
|
|
/** |
187
|
|
|
* Create a curl handle for a request |
188
|
|
|
* |
189
|
|
|
* @param RequestInterface $request Request |
190
|
|
|
* |
191
|
|
|
* @return CurlHandle |
192
|
|
|
*/ |
193
|
|
|
protected function createCurlHandle(RequestInterface $request) |
194
|
|
|
{ |
195
|
|
|
$wrapper = CurlHandle::factory($request); |
196
|
|
|
$this->handles[$request] = $wrapper; |
197
|
|
|
$this->resourceHash[(int) $wrapper->getHandle()] = $request; |
198
|
|
|
|
199
|
|
|
return $wrapper; |
200
|
|
|
} |
201
|
|
|
|
202
|
|
|
/** |
203
|
|
|
* Get the data from the multi handle |
204
|
|
|
*/ |
205
|
|
|
protected function perform() |
206
|
|
|
{ |
207
|
|
|
$event = new Event(array('curl_multi' => $this)); |
208
|
|
|
|
209
|
|
|
while ($this->requests) { |
|
|
|
|
210
|
|
|
// Notify each request as polling |
211
|
|
|
$blocking = $total = 0; |
212
|
|
|
foreach ($this->requests as $request) { |
213
|
|
|
++$total; |
214
|
|
|
$event['request'] = $request; |
215
|
|
|
|
216
|
|
|
/** @var EventDispatcherInterface $dispatcher */ |
217
|
|
|
$dispatcher = $request->getEventDispatcher(); |
218
|
|
|
$eventName = self::POLLING_REQUEST; |
219
|
|
|
|
220
|
|
|
$event->setEventDispatcher($dispatcher); |
221
|
|
|
$event->setName($eventName); |
222
|
|
|
|
223
|
|
|
$dispatcher->dispatch($eventName, $event); |
224
|
|
|
// The blocking variable just has to be non-falsey to block the loop |
225
|
|
|
if ($request->getParams()->hasKey(self::BLOCKING)) { |
226
|
|
|
++$blocking; |
227
|
|
|
} |
228
|
|
|
} |
229
|
|
|
if ($blocking == $total) { |
230
|
|
|
// Sleep to prevent eating CPU because no requests are actually pending a select call |
231
|
|
|
usleep(500); |
232
|
|
|
} else { |
233
|
|
|
$this->executeHandles(); |
234
|
|
|
} |
235
|
|
|
} |
236
|
|
|
} |
237
|
|
|
|
238
|
|
|
/** |
239
|
|
|
* Execute and select curl handles |
240
|
|
|
*/ |
241
|
|
|
private function executeHandles() |
242
|
|
|
{ |
243
|
|
|
// The first curl_multi_select often times out no matter what, but is usually required for fast transfers |
244
|
|
|
$selectTimeout = 0.001; |
245
|
|
|
$active = false; |
246
|
|
|
do { |
247
|
|
|
while (($mrc = curl_multi_exec($this->multiHandle, $active)) == CURLM_CALL_MULTI_PERFORM); |
248
|
|
|
$this->checkCurlResult($mrc); |
249
|
|
|
$this->processMessages(); |
250
|
|
|
if ($active && curl_multi_select($this->multiHandle, $selectTimeout) === -1) { |
251
|
|
|
// Perform a usleep if a select returns -1: https://bugs.php.net/bug.php?id=61141 |
252
|
|
|
usleep(150); |
253
|
|
|
} |
254
|
|
|
$selectTimeout = $this->selectTimeout; |
255
|
|
|
} while ($active); |
256
|
|
|
} |
257
|
|
|
|
258
|
|
|
/** |
259
|
|
|
* Process any received curl multi messages |
260
|
|
|
*/ |
261
|
|
|
private function processMessages() |
262
|
|
|
{ |
263
|
|
|
while ($done = curl_multi_info_read($this->multiHandle)) { |
264
|
|
|
$request = $this->resourceHash[(int) $done['handle']]; |
265
|
|
|
try { |
266
|
|
|
$this->processResponse($request, $this->handles[$request], $done); |
267
|
|
|
$this->successful[] = $request; |
268
|
|
|
} catch (\Exception $e) { |
269
|
|
|
$this->removeErroredRequest($request, $e); |
270
|
|
|
} |
271
|
|
|
} |
272
|
|
|
} |
273
|
|
|
|
274
|
|
|
/** |
275
|
|
|
* Remove a request that encountered an exception |
276
|
|
|
* |
277
|
|
|
* @param RequestInterface $request Request to remove |
278
|
|
|
* @param \Exception $e Exception encountered |
279
|
|
|
*/ |
280
|
|
|
protected function removeErroredRequest(RequestInterface $request, \Exception $e = null) |
281
|
|
|
{ |
282
|
|
|
$this->exceptions[] = array('request' => $request, 'exception' => $e); |
283
|
|
|
$this->remove($request); |
284
|
|
|
$this->dispatch(self::MULTI_EXCEPTION, array('exception' => $e, 'all_exceptions' => $this->exceptions)); |
285
|
|
|
} |
286
|
|
|
|
287
|
|
|
/** |
288
|
|
|
* Check for errors and fix headers of a request based on a curl response |
289
|
|
|
* |
290
|
|
|
* @param RequestInterface $request Request to process |
291
|
|
|
* @param CurlHandle $handle Curl handle object |
292
|
|
|
* @param array $curl Array returned from curl_multi_info_read |
293
|
|
|
* |
294
|
|
|
* @throws CurlException on Curl error |
295
|
|
|
*/ |
296
|
|
|
protected function processResponse(RequestInterface $request, CurlHandle $handle, array $curl) |
297
|
|
|
{ |
298
|
|
|
// Set the transfer stats on the response |
299
|
|
|
$handle->updateRequestFromTransfer($request); |
300
|
|
|
// Check if a cURL exception occurred, and if so, notify things |
301
|
|
|
$curlException = $this->isCurlException($request, $handle, $curl); |
302
|
|
|
|
303
|
|
|
// Always remove completed curl handles. They can be added back again |
304
|
|
|
// via events if needed (e.g. ExponentialBackoffPlugin) |
305
|
|
|
$this->removeHandle($request); |
306
|
|
|
|
307
|
|
|
if (!$curlException) { |
308
|
|
|
if ($this->validateResponseWasSet($request)) { |
309
|
|
|
$state = $request->setState( |
310
|
|
|
RequestInterface::STATE_COMPLETE, |
311
|
|
|
array('handle' => $handle) |
312
|
|
|
); |
313
|
|
|
// Only remove the request if it wasn't resent as a result of |
314
|
|
|
// the state change |
315
|
|
|
if ($state != RequestInterface::STATE_TRANSFER) { |
316
|
|
|
$this->remove($request); |
317
|
|
|
} |
318
|
|
|
} |
319
|
|
|
return; |
320
|
|
|
} |
321
|
|
|
|
322
|
|
|
// Set the state of the request to an error |
323
|
|
|
$state = $request->setState(RequestInterface::STATE_ERROR, array('exception' => $curlException)); |
324
|
|
|
// Allow things to ignore the error if possible |
325
|
|
|
if ($state != RequestInterface::STATE_TRANSFER) { |
326
|
|
|
$this->remove($request); |
327
|
|
|
} |
328
|
|
|
|
329
|
|
|
// The error was not handled, so fail |
330
|
|
|
if ($state == RequestInterface::STATE_ERROR) { |
331
|
|
|
/** @var CurlException $curlException */ |
332
|
|
|
throw $curlException; |
333
|
|
|
} |
334
|
|
|
} |
335
|
|
|
|
336
|
|
|
/** |
337
|
|
|
* Remove a curl handle from the curl multi object |
338
|
|
|
* |
339
|
|
|
* @param RequestInterface $request Request that owns the handle |
340
|
|
|
*/ |
341
|
|
|
protected function removeHandle(RequestInterface $request) |
342
|
|
|
{ |
343
|
|
|
if (isset($this->handles[$request])) { |
344
|
|
|
$handle = $this->handles[$request]; |
345
|
|
|
curl_multi_remove_handle($this->multiHandle, $handle->getHandle()); |
346
|
|
|
unset($this->handles[$request]); |
347
|
|
|
unset($this->resourceHash[(int) $handle->getHandle()]); |
348
|
|
|
$handle->close(); |
349
|
|
|
} |
350
|
|
|
} |
351
|
|
|
|
352
|
|
|
/** |
353
|
|
|
* Check if a cURL transfer resulted in what should be an exception |
354
|
|
|
* |
355
|
|
|
* @param RequestInterface $request Request to check |
356
|
|
|
* @param CurlHandle $handle Curl handle object |
357
|
|
|
* @param array $curl Array returned from curl_multi_info_read |
358
|
|
|
* |
359
|
|
|
* @return CurlException|bool |
360
|
|
|
*/ |
361
|
|
|
private function isCurlException(RequestInterface $request, CurlHandle $handle, array $curl) |
362
|
|
|
{ |
363
|
|
|
if (CURLM_OK == $curl['result'] || CURLM_CALL_MULTI_PERFORM == $curl['result']) { |
364
|
|
|
return false; |
365
|
|
|
} |
366
|
|
|
|
367
|
|
|
$handle->setErrorNo($curl['result']); |
368
|
|
|
$e = new CurlException(sprintf('[curl] %s: %s [url] %s', |
369
|
|
|
$handle->getErrorNo(), $handle->getError(), $handle->getUrl())); |
370
|
|
|
$e->setCurlHandle($handle) |
|
|
|
|
371
|
|
|
->setRequest($request) |
372
|
|
|
->setCurlInfo($handle->getInfo()) |
373
|
|
|
->setError($handle->getError(), $handle->getErrorNo()); |
374
|
|
|
|
375
|
|
|
return $e; |
376
|
|
|
} |
377
|
|
|
|
378
|
|
|
/** |
379
|
|
|
* Throw an exception for a cURL multi response if needed |
380
|
|
|
* |
381
|
|
|
* @param int $code Curl response code |
382
|
|
|
* @throws CurlException |
383
|
|
|
*/ |
384
|
|
|
private function checkCurlResult($code) |
385
|
|
|
{ |
386
|
|
|
if ($code != CURLM_OK && $code != CURLM_CALL_MULTI_PERFORM) { |
387
|
|
|
throw new CurlException(isset($this->multiErrors[$code]) |
388
|
|
|
? "cURL error: {$code} ({$this->multiErrors[$code][0]}): cURL message: {$this->multiErrors[$code][1]}" |
389
|
|
|
: 'Unexpected cURL error: ' . $code |
390
|
|
|
); |
391
|
|
|
} |
392
|
|
|
} |
393
|
|
|
|
394
|
|
|
/** |
395
|
|
|
* @link https://github.com/guzzle/guzzle/issues/710 |
396
|
|
|
*/ |
397
|
|
|
private function validateResponseWasSet(RequestInterface $request) |
398
|
|
|
{ |
399
|
|
|
if ($request->getResponse()) { |
400
|
|
|
return true; |
401
|
|
|
} |
402
|
|
|
|
403
|
|
|
$body = $request instanceof EntityEnclosingRequestInterface |
404
|
|
|
? $request->getBody() |
405
|
|
|
: null; |
406
|
|
|
|
407
|
|
|
if (!$body) { |
408
|
|
|
$rex = new RequestException( |
409
|
|
|
'No response was received for a request with no body. This' |
410
|
|
|
. ' could mean that you are saturating your network.' |
411
|
|
|
); |
412
|
|
|
$rex->setRequest($request); |
413
|
|
|
$this->removeErroredRequest($request, $rex); |
414
|
|
|
} elseif (!$body->isSeekable() || !$body->seek(0)) { |
415
|
|
|
// Nothing we can do with this. Sorry! |
416
|
|
|
$rex = new RequestException( |
417
|
|
|
'The connection was unexpectedly closed. The request would' |
418
|
|
|
. ' have been retried, but attempting to rewind the' |
419
|
|
|
. ' request body failed.' |
420
|
|
|
); |
421
|
|
|
$rex->setRequest($request); |
422
|
|
|
$this->removeErroredRequest($request, $rex); |
423
|
|
|
} else { |
424
|
|
|
$this->remove($request); |
425
|
|
|
// Add the request back to the batch to retry automatically. |
426
|
|
|
$this->requests[] = $request; |
427
|
|
|
$this->addHandle($request); |
428
|
|
|
} |
429
|
|
|
|
430
|
|
|
return false; |
431
|
|
|
} |
432
|
|
|
} |
433
|
|
|
|
This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.
Consider making the comparison explicit by using
empty(..)
or! empty(...)
instead.