1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace mpyw\Co\Internal; |
4
|
|
|
use mpyw\Co\Co; |
5
|
|
|
use mpyw\Co\Internal\CoOption; |
6
|
|
|
use mpyw\Co\CURLException; |
7
|
|
|
use mpyw\RuntimePromise\Deferred; |
8
|
|
|
|
9
|
|
|
class CURLPool |
10
|
|
|
{ |
11
|
|
|
/** |
12
|
|
|
* Options. |
13
|
|
|
* @var CoOption |
14
|
|
|
*/ |
15
|
|
|
private $options; |
16
|
|
|
|
17
|
|
|
/** |
18
|
|
|
* cURL multi handle. |
19
|
|
|
* @var resource |
20
|
|
|
*/ |
21
|
|
|
private $mh; |
22
|
|
|
|
23
|
|
|
/** |
24
|
|
|
* cURL handles those have not been dispatched. |
25
|
|
|
* @var array |
26
|
|
|
*/ |
27
|
|
|
private $queue = []; |
28
|
|
|
|
29
|
|
|
/** |
30
|
|
|
* cURL handles those have been already dispatched. |
31
|
|
|
* @var array |
32
|
|
|
*/ |
33
|
|
|
private $added = []; |
34
|
|
|
|
35
|
|
|
/** |
36
|
|
|
* Delays to be ended at. |
37
|
|
|
* @var array |
38
|
|
|
*/ |
39
|
|
|
private $untils = []; |
40
|
|
|
|
41
|
|
|
/** |
42
|
|
|
* React Deferreds. |
43
|
|
|
* @var Deferred |
44
|
|
|
*/ |
45
|
|
|
private $deferreds = []; |
46
|
|
|
|
47
|
|
|
/** |
48
|
|
|
* Used for halting loop. |
49
|
|
|
* @var \RuntimeException |
50
|
|
|
*/ |
51
|
|
|
private $haltException; |
52
|
|
|
|
53
|
|
|
/** |
54
|
|
|
* Constructor. |
55
|
|
|
* Initialize cURL multi handle. |
56
|
|
|
* @param CoOption $options |
57
|
|
|
*/ |
58
|
19 |
|
public function __construct(CoOption $options) |
59
|
19 |
|
{ |
60
|
19 |
|
$this->options = $options; |
61
|
19 |
|
$this->mh = curl_multi_init(); |
62
|
19 |
|
$flags = (int)$options['pipeline'] + (int)$options['multiplex'] * 2; |
63
|
19 |
|
curl_multi_setopt($this->mh, CURLMOPT_PIPELINING, $flags); |
64
|
19 |
|
} |
65
|
|
|
|
66
|
|
|
/** |
67
|
|
|
* Call curl_multi_add_handle() or push into queue. |
68
|
|
|
* @param resource $ch |
69
|
|
|
* @param Deferred $deferred |
70
|
|
|
*/ |
71
|
9 |
|
public function addOrEnqueue($ch, Deferred $deferred = null) |
72
|
9 |
|
{ |
73
|
9 |
|
if (isset($this->added[(string)$ch]) || isset($this->queue[(string)$ch])) { |
74
|
3 |
|
throw new \InvalidArgumentException("The cURL handle is already enqueued: $ch"); |
75
|
|
|
} |
76
|
9 |
|
if (count($this->added) >= $this->options['concurrency']) { |
77
|
2 |
|
$this->queue[(string)$ch] = $ch; |
78
|
2 |
|
$deferred && $this->deferreds[(string)$ch] = $deferred; |
79
|
2 |
|
return; |
80
|
|
|
} |
81
|
9 |
|
$errno = curl_multi_add_handle($this->mh, $ch); |
82
|
9 |
|
if ($errno !== CURLM_OK) { |
83
|
|
|
// @codeCoverageIgnoreStart |
84
|
|
|
$msg = curl_multi_strerror($errno) . ": $ch"; |
85
|
|
|
$deferred && $deferred->reject(new \RuntimeException($msg)); |
86
|
|
|
return; |
87
|
|
|
// @codeCoverageIgnoreEnd |
88
|
|
|
} |
89
|
9 |
|
$this->added[(string)$ch] = $ch; |
90
|
9 |
|
$deferred && $this->deferreds[(string)$ch] = $deferred; |
91
|
9 |
|
} |
92
|
|
|
|
93
|
|
|
/** |
94
|
|
|
* Add delay. |
95
|
|
|
* @param int $time |
96
|
|
|
* @param Deferred $deferred |
97
|
|
|
*/ |
98
|
4 |
|
public function addDelay($time, Deferred $deferred) |
99
|
4 |
|
{ |
100
|
4 |
|
$time = filter_var($time, FILTER_VALIDATE_FLOAT); |
101
|
4 |
|
if ($time === false || $time < 0) { |
102
|
1 |
|
throw new \InvalidArgumentException('Delay must be positive number.'); |
103
|
|
|
} |
104
|
|
|
do { |
105
|
3 |
|
$id = uniqid(); |
106
|
3 |
|
} while (isset($this->untils[$id])); |
107
|
3 |
|
$this->untils[$id] = microtime(true) + $time; |
108
|
3 |
|
$this->deferreds[$id] = $deferred; |
109
|
3 |
|
} |
110
|
|
|
|
111
|
|
|
/** |
112
|
|
|
* Run curl_multi_exec() loop. |
113
|
|
|
*/ |
114
|
13 |
|
public function wait() |
115
|
13 |
|
{ |
116
|
13 |
|
curl_multi_exec($this->mh, $active); // Start requests. |
117
|
|
|
do { |
118
|
|
|
// if cURL handle is running, use curl_multi_select() |
119
|
|
|
// otherwise, just sleep until nearest time |
120
|
13 |
|
$this->added || $this->queue |
121
|
6 |
|
? curl_multi_select($this->mh, $this->options['interval']) < 0 |
122
|
6 |
|
&& usleep($this->options['interval'] * 1000000) |
123
|
8 |
|
: $this->sleepUntilNearestTime(); |
124
|
13 |
|
curl_multi_exec($this->mh, $active); |
125
|
13 |
|
$this->consumeCurlsAndUntils(); |
126
|
13 |
|
} while (!$this->haltException && ($this->added || $this->queue || $this->untils)); |
127
|
13 |
|
if ($this->haltException) { |
128
|
3 |
|
throw $this->haltException; |
129
|
|
|
} |
130
|
11 |
|
} |
131
|
|
|
|
132
|
|
|
/** |
133
|
|
|
* Used for halting loop. |
134
|
|
|
*/ |
135
|
3 |
|
public function reserveHaltException($e) |
136
|
3 |
|
{ |
137
|
3 |
|
$this->haltException = $e; |
138
|
3 |
|
} |
139
|
|
|
|
140
|
|
|
/** |
141
|
|
|
* Sleep at least required. |
142
|
|
|
*/ |
143
|
8 |
|
private function sleepUntilNearestTime() |
144
|
8 |
|
{ |
145
|
8 |
|
$now = microtime(true); |
146
|
8 |
|
$min = null; |
147
|
8 |
|
foreach ($this->untils as $id => $until) { |
148
|
3 |
|
$diff = $now - $until; |
149
|
3 |
|
if ($diff < 0 || $min !== null && $diff >= $min) { |
150
|
3 |
|
continue; |
151
|
|
|
} |
152
|
2 |
|
$min = $diff; |
153
|
|
|
} |
154
|
8 |
|
if ($min !== null) { |
155
|
2 |
|
return; |
156
|
|
|
} |
157
|
8 |
|
usleep($min * 1000000); |
158
|
8 |
|
} |
159
|
|
|
|
160
|
|
|
/** |
161
|
|
|
* Consume completed cURL handles and delays. |
162
|
|
|
*/ |
163
|
13 |
|
private function consumeCurlsAndUntils() |
164
|
13 |
|
{ |
165
|
13 |
|
$entries = []; |
166
|
|
|
// First, we have to poll completed entries |
167
|
|
|
// DO NOT call curl_multi_add_handle() until polling done |
168
|
13 |
|
while ($entry = curl_multi_info_read($this->mh)) { |
169
|
6 |
|
$entries[] = $entry; |
170
|
|
|
} |
171
|
|
|
// Remove entry from multi handle to consume queue if available |
172
|
13 |
|
foreach ($entries as $entry) { |
173
|
6 |
|
curl_multi_remove_handle($this->mh, $entry['handle']); |
174
|
6 |
|
unset($this->added[(string)$entry['handle']]); |
175
|
6 |
|
if (!$this->queue) { |
|
|
|
|
176
|
6 |
|
continue; |
177
|
|
|
} |
178
|
1 |
|
$this->addOrEnqueue(array_shift($this->queue)); |
179
|
|
|
} |
180
|
|
|
// Now we check specified delay time elapsed |
181
|
13 |
|
foreach ($this->untils as $id => $until) { |
182
|
3 |
|
$diff = $until - microtime(true); |
183
|
3 |
|
if ($diff > 0.0 || !isset($this->deferreds[$id])) { |
184
|
3 |
|
continue; |
185
|
|
|
} |
186
|
2 |
|
$deferred = $this->deferreds[$id]; |
187
|
2 |
|
unset($this->deferreds[$id], $this->untils[$id]); |
188
|
2 |
|
$deferred->resolve(null); |
189
|
|
|
} |
190
|
|
|
// Finally, resolve cURL responses |
191
|
13 |
|
foreach ($entries as $entry) { |
192
|
6 |
|
if (!isset($this->deferreds[(string)$entry['handle']])) { |
193
|
|
|
continue; |
194
|
|
|
} |
195
|
6 |
|
$r = $entry['result'] === CURLE_OK |
196
|
6 |
|
? curl_multi_getcontent($entry['handle']) |
197
|
6 |
|
: new CURLException(curl_error($entry['handle']), $entry['result'], $entry['handle']); |
198
|
6 |
|
$deferred = $this->deferreds[(string)$entry['handle']]; |
199
|
6 |
|
unset($this->deferreds[(string)$entry['handle']]); |
200
|
6 |
|
$r instanceof CURLException ? $deferred->reject($r) : $deferred->resolve($r); |
201
|
|
|
} |
202
|
13 |
|
} |
203
|
|
|
} |
204
|
|
|
|
This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.
Consider making the comparison explicit by using
empty(..)
or! empty(...)
instead.