Completed
Push — master ( 791859...f397fd )
by Ryosuke
03:41
created

CURLPool::removeDestination()   B

Complexity

Conditions 5
Paths 6

Size

Total Lines 14
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 12
CRAP Score 5

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 14
ccs 12
cts 12
cp 1
rs 8.8571
cc 5
eloc 10
nc 6
nop 1
crap 5
1
<?php
2
3
namespace mpyw\Co\Internal;
4
use mpyw\Co\Co;
5
use mpyw\Co\Internal\CoOption;
6
use mpyw\Co\CURLException;
7
use mpyw\RuntimePromise\Deferred;
8
9
class CURLPool
10
{
11
    /**
12
     * Options.
13
     * @var CoOption
14
     */
15
    private $options;
16
17
    /**
18
     * cURL multi handle.
19
     * @var resource
20
     */
21
    private $mh;
22
23
    /**
24
     * cURL handles those have not been dispatched.
25
     * @var array
26
     */
27
    private $queue = [];
28
29
    /**
30
     * cURL handles those have been already dispatched.
31
     * @var array
32
     */
33
    private $added = [];
34
35
    /**
36
     * The number of whole running TCP connections.
37
     * @var array
38
     */
39
    private $connectionCount = 0;
40
41
    /**
42
     * Counts per destination.
43
     *   key   => identifier
44
     *   value => count
45
     * @var array
46
     */
47
    private $destinations = [];
48
49
    /**
50
     * Delays to be ended at.
51
     * @var array
52
     */
53
    private $untils = [];
54
55
    /**
56
     * React Deferreds.
57
     * @var Deferred
58
     */
59
    private $deferreds = [];
60
61
    /**
62
     * Used for halting loop.
63
     * @var \RuntimeException
64
     */
65
    private $haltException;
66
67
    /**
68
     * Constructor.
69
     * Initialize cURL multi handle.
70
     * @param CoOption $options
71
     */
72 26
    public function __construct(CoOption $options)
73 26
    {
74 26
        $this->options = $options;
75 26
        $this->mh = curl_multi_init();
76 26
        $flags = (int)$options['pipeline'] + (int)$options['multiplex'] * 2;
77 26
        curl_multi_setopt($this->mh, CURLMOPT_PIPELINING, $flags);
78 26
    }
79
80
    /**
81
     * Call curl_multi_add_handle() or push into queue.
82
     * @param resource $ch
83
     * @param Deferred $deferred
84
     */
85 13
    public function addOrEnqueue($ch, Deferred $deferred = null)
86 13
    {
87 13
        if (isset($this->added[(string)$ch]) || isset($this->queue[(string)$ch])) {
88 3
            throw new \UnexpectedValueException("The cURL handle is already enqueued: $ch");
89
        }
90 13
        $id = $this->options['group'] ? (string)curl_getinfo($ch, CURLINFO_PRIVATE) : '';
91 13
        $this->options['concurrency'] > 0
92 13
        && $this->connectionCount >= $this->options['concurrency']
93 13
        && ($id === '' || empty($this->destinations[$id]))
94 3
            ? $this->enqueueCurl($ch, $deferred)
95 13
            : $this->addCurl($ch, $deferred);
96 13
    }
97
98
    /**
99
     * Call curl_multi_add_handle().
100
     * @param resource $ch
101
     * @param Deferred $deferred
102
     */
103 13
    private function addCurl($ch, Deferred $deferred = null)
104 13
    {
105 13
        $errno = curl_multi_add_handle($this->mh, $ch);
106 13
        if ($errno !== CURLM_OK) {
107
            // @codeCoverageIgnoreStart
108
            $msg = curl_multi_strerror($errno) . ": $ch";
109
            $deferred && $deferred->reject(new \RuntimeException($msg));
110
            return;
111
            // @codeCoverageIgnoreEnd
112
        }
113 13
        $this->added[(string)$ch] = $ch;
114 13
        $this->addDestination($ch);
115 13
        $deferred && $this->deferreds[(string)$ch] = $deferred;
116 13
    }
117
118
    /**
119
     * Add destination info.
120
     * @param resource $ch
121
     */
122 13
    private function addDestination($ch)
123 13
    {
124 13
        $id = $this->options['group'] ? (string)curl_getinfo($ch, CURLINFO_PRIVATE) : '';
125 13
        if ($id === '') {
126 12
            ++$this->connectionCount;
127 12
            return;
128
        }
129 1
        if (empty($this->destinations[$id])) {
130 1
            $this->destinations[$id] = 1;
131 1
            ++$this->connectionCount;
132 1
            return;
133
        }
134 1
        ++$this->destinations[$id];
135 1
    }
136
137
    /**
138
     * Remove destination info.
139
     * @param resource $ch
140
     */
141 10
    private function removeDestination($ch)
142 10
    {
143 10
        $id = $this->options['group'] ? (string)curl_getinfo($ch, CURLINFO_PRIVATE) : '';
144 10
        if ($id === '') {
145 9
            --$this->connectionCount;
146 9
            return;
147
        }
148 1
        if (empty($this->destinations[$id]) || $this->destinations[$id] === 1) {
149 1
            unset($this->destinations[$id]);
150 1
            --$this->connectionCount;
151 1
            return;
152
        }
153 1
        --$this->destinations[$id];
154 1
    }
155
156
    /**
157
     * Push into queue.
158
     * @param resource $ch
159
     * @param Deferred $deferred
160
     */
161 3
    private function enqueueCurl($ch, Deferred $deferred = null)
162 3
    {
163 3
        $this->queue[(string)$ch] = $ch;
164 3
        $deferred && $this->deferreds[(string)$ch] = $deferred;
165 3
    }
166
167
    /**
168
     * Add delay.
169
     * @param int      $time
170
     * @param Deferred $deferred
171
     */
172 6
    public function addDelay($time, Deferred $deferred)
173 6
    {
174 6
        $time = filter_var($time, FILTER_VALIDATE_FLOAT);
175 6
        if ($time === false) {
176 1
            throw new \InvalidArgumentException('Delay must be number.');
177
        }
178 5
        if ($time < 0) {
179 1
            throw new \DomainException('Delay must be positive.');
180
        }
181
        do {
182 4
            $id = uniqid();
183 4
        } while (isset($this->untils[$id]));
184 4
        $this->untils[$id] = microtime(true) + $time;
185 4
        $this->deferreds[$id] = $deferred;
186 4
    }
187
188
    /**
189
     * Run curl_multi_exec() loop.
190
     */
191 18
    public function wait()
192 18
    {
193 18
        curl_multi_exec($this->mh, $active); // Start requests.
194
        do {
195
            // if cURL handle is running, use curl_multi_select()
196
            // otherwise, just sleep until nearest time
197 18
            $this->added || $this->queue
198 10
                ? curl_multi_select($this->mh, $this->options['interval']) < 0
199 10
                  && usleep($this->options['interval'] * 1000000)
200 9
                : $this->sleepUntilNearestTime();
201 18
            curl_multi_exec($this->mh, $active);
202 18
            $this->consumeCurlsAndUntils();
203 18
        } while (!$this->haltException && ($this->added || $this->queue || $this->untils));
204 18
        if ($this->haltException) {
205 4
            throw $this->haltException;
206
        }
207 15
    }
208
209
    /**
210
     * Used for halting loop.
211
     */
212 4
    public function reserveHaltException($e)
213 4
    {
214 4
        $this->haltException = $e;
215 4
    }
216
217
    /**
218
     * Sleep at least required.
219
     */
220 9
    private function sleepUntilNearestTime()
221 9
    {
222 9
        $now = microtime(true);
223 9
        $min = null;
224 9
        foreach ($this->untils as $id => $until) {
225 4
            $diff = $until - $now;
226 4
            if ($diff < 0) {
227
                // @codeCoverageIgnoreStart
228
                return;
229
                // @codeCoverageIgnoreEnd
230
            }
231 4
            if ($min !== null && $diff >= $min) {
232 2
                continue;
233
            }
234 4
            $min = $diff;
235
        }
236 9
        $min && usleep($min * 1000000);
237 9
    }
238
239
    /**
240
     * Consume completed cURL handles and delays.
241
     */
242 18
    private function consumeCurlsAndUntils()
243 18
    {
244 18
        $entries = $this->consumeCurls();
245 18
        $this->consumeUntils();
246 18
        $this->resolveCurls($entries);
247 18
    }
248
249
    /**
250
     * Poll completed cURL entries and consume cURL queue.
251
     * @return array
252
     */
253 18
    private function consumeCurls()
254 18
    {
255 18
        $entries = [];
256
        // DO NOT call curl_multi_add_handle() until polling done
257 18
        while ($entry = curl_multi_info_read($this->mh)) {
258 10
            $entries[] = $entry;
259
        }
260 18
        foreach ($entries as $entry) {
261 10
            curl_multi_remove_handle($this->mh, $entry['handle']);
262 10
            unset($this->added[(string)$entry['handle']]);
263 10
            $this->removeDestination($entry['handle']);
264 10
            $this->queue && $this->addOrEnqueue(array_shift($this->queue));
265
        }
266 18
        return $entries;
267
    }
268
269
    /**
270
     * Consume delay queue.
271
     */
272 18
    private function consumeUntils()
273 18
    {
274 18
        foreach ($this->untils as $id => $until) {
275 4
            $diff = $until - microtime(true);
276 4
            if ($diff > 0.0 || !isset($this->deferreds[$id])) {
277 2
                continue;
278
            }
279 4
            $deferred = $this->deferreds[$id];
280 4
            unset($this->deferreds[$id], $this->untils[$id]);
281 4
            $deferred->resolve(null);
282
        }
283 18
    }
284
285
    /**
286
     * Resolve polled cURLs.
287
     * @param  array $entries Polled cURL entries.
288
     */
289 18
    private function resolveCurls($entries)
290 18
    {
291 18
        foreach ($entries as $entry) {
292 10
            if (!isset($this->deferreds[(string)$entry['handle']])) {
293 1
                continue;
294
            }
295 9
            $deferred = $this->deferreds[(string)$entry['handle']];
296 9
            unset($this->deferreds[(string)$entry['handle']]);
297 9
            $entry['result'] === CURLE_OK
298 9
                ? $deferred->resolve(curl_multi_getcontent($entry['handle']))
299 9
                : $deferred->reject(new CURLException(curl_error($entry['handle']), $entry['result'], $entry['handle']));
300
        }
301 18
    }
302
}
303