Completed
Push — master ( 758ab1...6e9bde )
by Ryosuke
03:31
created

CURLPool::removeDestination()   A

Complexity

Conditions 4
Paths 3

Size

Total Lines 14
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 12
CRAP Score 4

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 14
ccs 12
cts 12
cp 1
rs 9.2
cc 4
eloc 10
nc 3
nop 1
crap 4
1
<?php
2
3
namespace mpyw\Co\Internal;
4
use mpyw\Co\Co;
5
use mpyw\Co\Internal\CoOption;
6
use mpyw\Co\CURLException;
7
use mpyw\RuntimePromise\Deferred;
8
9
class CURLPool
10
{
11
    /**
12
     * Options.
13
     * @var CoOption
14
     */
15
    private $options;
16
17
    /**
18
     * cURL multi handle.
19
     * @var resource
20
     */
21
    private $mh;
22
23
    /**
24
     * cURL handles those have not been dispatched.
25
     * @var array
26
     */
27
    private $queue = [];
28
29
    /**
30
     * cURL handles those have been already dispatched.
31
     * @var array
32
     */
33
    private $added = [];
34
35
    /**
36
     * The number of whole running TCP connections.
37
     * @var array
38
     */
39
    private $connectionCount = 0;
40
41
    /**
42
     * Counts per destination.
43
     *   key   => identifier
44
     *   value => count
45
     * @var array
46
     */
47
    private $destinations = [];
48
49
    /**
50
     * Delays to be ended at.
51
     * @var array
52
     */
53
    private $untils = [];
54
55
    /**
56
     * React Deferreds.
57
     * @var Deferred
58
     */
59
    private $deferreds = [];
60
61
    /**
62
     * Used for halting loop.
63
     * @var \RuntimeException
64
     */
65
    private $haltException;
66
67
    /**
68
     * Constructor.
69
     * Initialize cURL multi handle.
70
     * @param CoOption $options
71
     */
72 22
    public function __construct(CoOption $options)
73 22
    {
74 22
        $this->options = $options;
75 22
        $this->mh = curl_multi_init();
76 22
        $flags = (int)$options['pipeline'] + (int)$options['multiplex'] * 2;
77 22
        curl_multi_setopt($this->mh, CURLMOPT_PIPELINING, $flags);
78 22
    }
79
80
    /**
81
     * Call curl_multi_add_handle() or push into queue.
82
     * @param resource $ch
83
     * @param Deferred $deferred
84
     */
85 12
    public function addOrEnqueue($ch, Deferred $deferred = null)
86 12
    {
87 12
        if (isset($this->added[(string)$ch]) || isset($this->queue[(string)$ch])) {
88 3
            throw new \InvalidArgumentException("The cURL handle is already enqueued: $ch");
89
        }
90 12
        $id = (string)curl_getinfo($ch, CURLINFO_PRIVATE);
91 12
        $this->options['concurrency'] > 0
92 12
        && $this->connectionCount >= $this->options['concurrency']
93 12
        && ($id === '' || empty($this->destinations[$id]))
94 2
            ? $this->enqueueCurl($ch, $deferred)
95 12
            : $this->addCurl($ch, $deferred);
96 12
    }
97
98
    /**
99
     * Call curl_multi_add_handle().
100
     * @param resource $ch
101
     * @param Deferred $deferred
102
     */
103 12
    private function addCurl($ch, Deferred $deferred = null)
104 12
    {
105 12
        $errno = curl_multi_add_handle($this->mh, $ch);
106 12
        if ($errno !== CURLM_OK) {
107
            // @codeCoverageIgnoreStart
108
            $msg = curl_multi_strerror($errno) . ": $ch";
109
            $deferred && $deferred->reject(new \RuntimeException($msg));
110
            return;
111
            // @codeCoverageIgnoreEnd
112
        }
113 12
        $this->added[(string)$ch] = $ch;
114 12
        $this->addDestination($ch);
115 12
        $deferred && $this->deferreds[(string)$ch] = $deferred;
116 12
    }
117
118
    /**
119
     * Add destination info.
120
     * @param resource $ch
121
     */
122 12
    private function addDestination($ch)
123 12
    {
124 12
        $id = (string)curl_getinfo($ch, CURLINFO_PRIVATE);
125 12
        if ($id === '') {
126 11
            ++$this->connectionCount;
127 11
            return;
128
        }
129 1
        if (empty($this->destinations[$id])) {
130 1
            $this->destinations[$id] = 1;
131 1
            ++$this->connectionCount;
132 1
            return;
133
        }
134 1
        ++$this->destinations[$id];
135 1
    }
136
137
    /**
138
     * Remove destination info.
139
     * @param resource $ch
140
     */
141 9
    private function removeDestination($ch)
142 9
    {
143 9
        $id = (string)curl_getinfo($ch, CURLINFO_PRIVATE);
144 9
        if ($id === '') {
145 8
            --$this->connectionCount;
146 8
            return;
147
        }
148 1
        if (empty($this->destinations[$id]) || $this->destinations[$id] === 1) {
149 1
            unset($this->destinations[$id]);
150 1
            --$this->connectionCount;
151 1
            return;
152
        }
153 1
        --$this->destinations[$id];
154 1
    }
155
156
    /**
157
     * Push into queue.
158
     * @param resource $ch
159
     * @param Deferred $deferred
160
     */
161 2
    private function enqueueCurl($ch, Deferred $deferred = null)
162 2
    {
163 2
        $this->queue[(string)$ch] = $ch;
164 2
        $deferred && $this->deferreds[(string)$ch] = $deferred;
165 2
    }
166
167
    /**
168
     * Add delay.
169
     * @param int      $time
170
     * @param Deferred $deferred
171
     */
172 4
    public function addDelay($time, Deferred $deferred)
173 4
    {
174 4
        $time = filter_var($time, FILTER_VALIDATE_FLOAT);
175 4
        if ($time === false || $time < 0) {
176 1
            throw new \InvalidArgumentException('Delay must be positive number.');
177
        }
178
        do {
179 3
            $id = uniqid();
180 3
        } while (isset($this->untils[$id]));
181 3
        $this->untils[$id] = microtime(true) + $time;
182 3
        $this->deferreds[$id] = $deferred;
183 3
    }
184
185
    /**
186
     * Run curl_multi_exec() loop.
187
     */
188 16
    public function wait()
189 16
    {
190 16
        curl_multi_exec($this->mh, $active); // Start requests.
191
        do {
192
            // if cURL handle is running, use curl_multi_select()
193
            // otherwise, just sleep until nearest time
194 16
            $this->added || $this->queue
195 9
                ? curl_multi_select($this->mh, $this->options['interval']) < 0
196 9
                  && usleep($this->options['interval'] * 1000000)
197 8
                : $this->sleepUntilNearestTime();
198 16
            curl_multi_exec($this->mh, $active);
199 16
            $this->consumeCurlsAndUntils();
200 16
        } while (!$this->haltException && ($this->added || $this->queue || $this->untils));
201 16
        if ($this->haltException) {
202 3
            throw $this->haltException;
203
        }
204 14
    }
205
206
    /**
207
     * Used for halting loop.
208
     */
209 3
    public function reserveHaltException($e)
210 3
    {
211 3
        $this->haltException = $e;
212 3
    }
213
214
    /**
215
     * Sleep at least required.
216
     */
217 8
    private function sleepUntilNearestTime()
218 8
    {
219 8
        $now = microtime(true);
220 8
        $min = null;
221 8
        foreach ($this->untils as $id => $until) {
222 3
            $diff = $until - $now;
223 3
            if ($diff < 0) {
224
                // @codeCoverageIgnoreStart
225
                return;
226
                // @codeCoverageIgnoreEnd
227
            }
228 3
            if ($min !== null && $diff >= $min) {
229 2
                continue;
230
            }
231 3
            $min = $diff;
232
        }
233 8
        $min && usleep($min * 1000000);
234 8
    }
235
236
    /**
237
     * Consume completed cURL handles and delays.
238
     */
239 16
    private function consumeCurlsAndUntils()
240 16
    {
241 16
        $entries = $this->consumeCurls();
242 16
        $this->consumeUntils();
243 16
        $this->resolveCurls($entries);
244 16
    }
245
246
    /**
247
     * Poll completed cURL entries and consume cURL queue.
248
     * @return array
249
     */
250 16
    private function consumeCurls()
251 16
    {
252 16
        $entries = [];
253
        // DO NOT call curl_multi_add_handle() until polling done
254 16
        while ($entry = curl_multi_info_read($this->mh)) {
255 9
            $entries[] = $entry;
256
        }
257 16
        foreach ($entries as $entry) {
258 9
            curl_multi_remove_handle($this->mh, $entry['handle']);
259 9
            unset($this->added[(string)$entry['handle']]);
260 9
            $this->removeDestination($entry['handle']);
261 9
            $this->queue && $this->addOrEnqueue(array_shift($this->queue));
262
        }
263 16
        return $entries;
264
    }
265
266
    /**
267
     * Consume delay queue.
268
     */
269 16
    private function consumeUntils()
270 16
    {
271 16
        foreach ($this->untils as $id => $until) {
272 3
            $diff = $until - microtime(true);
273 3
            if ($diff > 0.0 || !isset($this->deferreds[$id])) {
274 2
                continue;
275
            }
276 3
            $deferred = $this->deferreds[$id];
277 3
            unset($this->deferreds[$id], $this->untils[$id]);
278 3
            $deferred->resolve(null);
279
        }
280 16
    }
281
282
    /**
283
     * Resolve polled cURLs.
284
     * @param  array $entries Polled cURL entries.
285
     */
286 16
    private function resolveCurls($entries)
287 16
    {
288 16
        foreach ($entries as $entry) {
289 9
            if (!isset($this->deferreds[(string)$entry['handle']])) {
290 1
                continue;
291
            }
292 8
            $deferred = $this->deferreds[(string)$entry['handle']];
293 8
            unset($this->deferreds[(string)$entry['handle']]);
294 8
            $entry['result'] === CURLE_OK
295 8
                ? $deferred->resolve(curl_multi_getcontent($entry['handle']))
296 8
                : $deferred->reject(new CURLException(curl_error($entry['handle']), $entry['result'], $entry['handle']));
297
        }
298 16
    }
299
}
300