Completed
Push — master ( d00bfd...a71924 )
by Ryosuke
03:46
created

CURLPool::addOrEnqueue()   A

Complexity

Conditions 4
Paths 3

Size

Total Lines 10
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 9
CRAP Score 4

Importance

Changes 9
Bugs 1 Features 2
Metric Value
c 9
b 1
f 2
dl 0
loc 10
ccs 9
cts 9
cp 1
rs 9.2
cc 4
eloc 7
nc 3
nop 2
crap 4
1
<?php
2
3
namespace mpyw\Co\Internal;
4
use mpyw\Co\Co;
5
use mpyw\Co\Internal\CoOption;
6
use mpyw\Co\CURLException;
7
use mpyw\RuntimePromise\Deferred;
8
9
class CURLPool
10
{
11
    /**
12
     * Options.
13
     * @var CoOption
14
     */
15
    private $options;
16
17
    /**
18
     * cURL multi handle.
19
     * @var resource
20
     */
21
    private $mh;
22
23
    /**
24
     * cURL handles those have not been dispatched.
25
     * @var array
26
     */
27
    private $queue = [];
28
29
    /**
30
     * cURL handles those have been already dispatched.
31
     * @var array
32
     */
33
    private $added = [];
34
35
    /**
36
     * The number of whole running TCP connections.
37
     * @var array
38
     */
39
    private $connectionCount = 0;
40
41
    /**
42
     * Counts per destination.
43
     *   key   => identifier
44
     *   value => count
45
     * @var array
46
     */
47
    private $destinations = [];
48
49
    /**
50
     * Delays to be ended at.
51
     * @var array
52
     */
53
    private $untils = [];
54
55
    /**
56
     * React Deferreds.
57
     * @var Deferred
58
     */
59
    private $deferreds = [];
60
61
    /**
62
     * Used for halting loop.
63
     * @var \RuntimeException
64
     */
65
    private $haltException;
66
67
    /**
68
     * Constructor.
69
     * Initialize cURL multi handle.
70
     * @param CoOption $options
71
     */
72 26
    public function __construct(CoOption $options)
73 26
    {
74 26
        $this->options = $options;
75 26
        $this->mh = curl_multi_init();
76 26
        $flags = (int)$options['pipeline'] + (int)$options['multiplex'] * 2;
77 26
        curl_multi_setopt($this->mh, CURLMOPT_PIPELINING, $flags);
78 26
    }
79
80
    /**
81
     * Call curl_multi_add_handle() or push into queue.
82
     * @param resource $ch
83
     * @param Deferred $deferred
84
     */
85 13
    public function addOrEnqueue($ch, Deferred $deferred = null)
86 13
    {
87 13
        if (isset($this->added[(string)$ch]) || isset($this->queue[(string)$ch])) {
88 3
            throw new \UnexpectedValueException("The cURL handle is already enqueued: $ch");
89
        }
90 13
        $id = $this->getIdentifier($ch);
91 13
        $this->isPoolFilled($id)
92 3
            ? $this->enqueueCurl($ch, $deferred)
93 13
            : $this->addCurl($ch, $deferred);
94 13
    }
95
96
    /**
97
     * Call curl_multi_add_handle().
98
     * @param resource $ch
99
     * @param Deferred $deferred
100
     */
101 13
    private function addCurl($ch, Deferred $deferred = null)
102 13
    {
103 13
        $errno = curl_multi_add_handle($this->mh, $ch);
104 13
        if ($errno !== CURLM_OK) {
105
            // @codeCoverageIgnoreStart
106
            $msg = curl_multi_strerror($errno) . ": $ch";
107
            $deferred && $deferred->reject(new \RuntimeException($msg));
108
            return;
109
            // @codeCoverageIgnoreEnd
110
        }
111 13
        $this->added[(string)$ch] = $ch;
112 13
        $this->addDestination($ch);
113 13
        $deferred && $this->deferreds[(string)$ch] = $deferred;
114 13
    }
115
116
    /**
117
     * Add destination info.
118
     * @param resource $ch
119
     */
120 13
    private function addDestination($ch)
121 13
    {
122 13
        $id = $this->getIdentifier($ch);
123 13
        if ($id === '') {
124 12
            ++$this->connectionCount;
125 12
            return;
126
        }
127 1
        if (empty($this->destinations[$id])) {
128 1
            $this->destinations[$id] = 1;
129 1
            ++$this->connectionCount;
130 1
            return;
131
        }
132 1
        ++$this->destinations[$id];
133 1
    }
134
135
    /**
136
     * Check if internal cURL pool is filled.
137
     * @param string $id
138
     * @return bool
139
     */
140 13
    private function isPoolFilled($id)
141 13
    {
142 13
        if ($id !== '' && !empty($this->destinations[$id])) {
143 1
            return false;
144
        }
145 13
        return $this->options['concurrency'] > 0
146 13
            && $this->connectionCount >= $this->options['concurrency'];
147
    }
148
149
    /**
150
     * Remove destination info.
151
     * @param resource $ch
152
     */
153 10
    private function removeDestination($ch)
154 10
    {
155 10
        $id = $this->getIdentifier($ch);
156 10
        if ($id === '') {
157 9
            --$this->connectionCount;
158 9
            return;
159
        }
160 1
        if (empty($this->destinations[$id]) || $this->destinations[$id] === 1) {
161 1
            unset($this->destinations[$id]);
162 1
            --$this->connectionCount;
163 1
            return;
164
        }
165 1
        --$this->destinations[$id];
166 1
    }
167
168
    /**
169
     * Push into queue.
170
     * @param resource $ch
171
     * @param Deferred $deferred
172
     */
173 3
    private function enqueueCurl($ch, Deferred $deferred = null)
174 3
    {
175 3
        $this->queue[(string)$ch] = $ch;
176 3
        $deferred && $this->deferreds[(string)$ch] = $deferred;
177 3
    }
178
179
    /**
180
     * Add delay.
181
     * @param int      $time
182
     * @param Deferred $deferred
183
     */
184 6
    public function addDelay($time, Deferred $deferred)
185 6
    {
186 6
        $time = filter_var($time, FILTER_VALIDATE_FLOAT);
187 6
        if ($time === false) {
188 1
            throw new \InvalidArgumentException('Delay must be number.');
189
        }
190 5
        if ($time < 0) {
191 1
            throw new \DomainException('Delay must be positive.');
192
        }
193
        do {
194 4
            $id = uniqid();
195 4
        } while (isset($this->untils[$id]));
196 4
        $this->untils[$id] = microtime(true) + $time;
197 4
        $this->deferreds[$id] = $deferred;
198 4
    }
199
200
    /**
201
     * Run curl_multi_exec() loop.
202
     */
203 18
    public function wait()
204 18
    {
205 18
        curl_multi_exec($this->mh, $active); // Start requests.
206
        do {
207
            // if cURL handle is running, use curl_multi_select()
208
            // otherwise, just sleep until nearest time
209 18
            $this->added || $this->queue
210 10
                ? curl_multi_select($this->mh, $this->options['interval']) < 0
211 10
                  && usleep($this->options['interval'] * 1000000)
212 9
                : $this->sleepUntilNearestTime();
213 18
            curl_multi_exec($this->mh, $active);
214 18
            $this->consumeCurlsAndUntils();
215 18
        } while (!$this->haltException && ($this->added || $this->queue || $this->untils));
216 18
        if ($this->haltException) {
217 4
            throw $this->haltException;
218
        }
219 15
    }
220
221
    /**
222
     * Used for halting loop.
223
     */
224 4
    public function reserveHaltException($e)
225 4
    {
226 4
        $this->haltException = $e;
227 4
    }
228
229
    /**
230
     * Sleep at least required.
231
     */
232 9
    private function sleepUntilNearestTime()
233 9
    {
234 9
        $now = microtime(true);
235 9
        $min = null;
236 9
        foreach ($this->untils as $id => $until) {
237 4
            $diff = $until - $now;
238 4
            if ($diff < 0) {
239
                // @codeCoverageIgnoreStart
240
                return;
241
                // @codeCoverageIgnoreEnd
242
            }
243 4
            if ($min !== null && $diff >= $min) {
244 2
                continue;
245
            }
246 4
            $min = $diff;
247
        }
248 9
        $min && usleep($min * 1000000);
249 9
    }
250
251
    /**
252
     * Consume completed cURL handles and delays.
253
     */
254 18
    private function consumeCurlsAndUntils()
255 18
    {
256 18
        $entries = $this->consumeCurls();
257 18
        $this->consumeUntils();
258 18
        $this->resolveCurls($entries);
259 18
    }
260
261
    /**
262
     * Poll completed cURL entries and consume cURL queue.
263
     * @return array
264
     */
265 18
    private function consumeCurls()
266 18
    {
267 18
        $entries = [];
268
        // DO NOT call curl_multi_add_handle() until polling done
269 18
        while ($entry = curl_multi_info_read($this->mh)) {
270 10
            $entries[] = $entry;
271
        }
272 18
        foreach ($entries as $entry) {
273 10
            curl_multi_remove_handle($this->mh, $entry['handle']);
274 10
            unset($this->added[(string)$entry['handle']]);
275 10
            $this->removeDestination($entry['handle']);
276 10
            $this->queue && $this->addOrEnqueue(array_shift($this->queue));
277
        }
278 18
        return $entries;
279
    }
280
281
    /**
282
     * Consume delay queue.
283
     */
284 18
    private function consumeUntils()
285 18
    {
286 18
        foreach ($this->untils as $id => $until) {
287 4
            $diff = $until - microtime(true);
288 4
            if ($diff > 0.0 || !isset($this->deferreds[$id])) {
289 2
                continue;
290
            }
291 4
            $deferred = $this->deferreds[$id];
292 4
            unset($this->deferreds[$id], $this->untils[$id]);
293 4
            $deferred->resolve(null);
294
        }
295 18
    }
296
297
    /**
298
     * Resolve polled cURLs.
299
     * @param  array $entries Polled cURL entries.
300
     */
301 18
    private function resolveCurls($entries)
302 18
    {
303 18
        foreach ($entries as $entry) {
304 10
            if (!isset($this->deferreds[(string)$entry['handle']])) {
305 1
                continue;
306
            }
307 9
            $deferred = $this->deferreds[(string)$entry['handle']];
308 9
            unset($this->deferreds[(string)$entry['handle']]);
309 9
            $entry['result'] === CURLE_OK
310 9
                ? $deferred->resolve(curl_multi_getcontent($entry['handle']))
311 9
                : $deferred->reject(new CURLException(curl_error($entry['handle']), $entry['result'], $entry['handle']));
312
        }
313 18
    }
314
315
    /**
316
     * Push into queue.
317
     * @param resource $ch
318
     * @return string
319
     */
320 13
    private function getIdentifier($ch)
321 13
    {
322 13
        return $this->options['group'] ? (string)curl_getinfo($ch, CURLINFO_PRIVATE) : '';
323
    }
324
}
325