Completed
Push — master ( 78e369...c8939c )
by Ryosuke
04:38
created

CURLPool::sleepUntilNearestTime()   B

Complexity

Conditions 6
Paths 8

Size

Total Lines 17
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 13
CRAP Score 6

Importance

Changes 1
Bugs 0 Features 1
Metric Value
c 1
b 0
f 1
dl 0
loc 17
ccs 13
cts 13
cp 1
rs 8.8571
cc 6
eloc 11
nc 8
nop 0
crap 6
1
<?php
2
3
namespace mpyw\Co\Internal;
4
use mpyw\Co\Co;
5
use mpyw\Co\Internal\CoOption;
6
use mpyw\Co\CURLException;
7
use mpyw\RuntimePromise\Deferred;
8
9
class CURLPool
10
{
11
    /**
12
     * Options.
13
     * @var CoOption
14
     */
15
    private $options;
16
17
    /**
18
     * cURL multi handle.
19
     * @var resource
20
     */
21
    private $mh;
22
23
    /**
24
     * cURL handles those have not been dispatched.
25
     * @var array
26
     */
27
    private $queue = [];
28
29
    /**
30
     * cURL handles those have been already dispatched.
31
     * @var array
32
     */
33
    private $added = [];
34
35
    /**
36
     * Delays to be ended at.
37
     * @var array
38
     */
39
    private $untils = [];
40
41
    /**
42
     * React Deferreds.
43
     * @var Deferred
44
     */
45
    private $deferreds = [];
46
47
    /**
48
     * Constructor.
49
     * Initialize cURL multi handle.
50
     * @param CoOption $options
51
     */
52 17
    public function __construct(CoOption $options)
53 17
    {
54 17
        $this->options = $options;
55 17
        $this->mh = curl_multi_init();
56 17
        $flags = (int)$options['pipeline'] + (int)$options['multiplex'] * 2;
57 17
        curl_multi_setopt($this->mh, CURLMOPT_PIPELINING, $flags);
58 17
    }
59
60
    /**
61
     * Call curl_multi_add_handle() or push into queue.
62
     * @param resource $ch
63
     * @param Deferred $deferred
64
     */
65 9
    public function addOrEnqueue($ch, Deferred $deferred = null)
66 9
    {
67 9
        if (isset($this->added[(string)$ch]) || isset($this->queue[(string)$ch])) {
68 3
            throw new \InvalidArgumentException("The cURL handle is already enqueued: $ch");
69
        }
70 9
        if (count($this->added) >= $this->options['concurrency']) {
71 2
            $this->queue[(string)$ch] = $ch;
72 2
            $deferred && $this->deferreds[(string)$ch] = $deferred;
73 2
            return;
74
        }
75 9
        $errno = curl_multi_add_handle($this->mh, $ch);
76 9
        if ($errno !== CURLM_OK) {
77
            // @codeCoverageIgnoreStart
78
            $msg = curl_multi_strerror($errno) . ": $ch";
79
            $deferred && $deferred->reject(new \RuntimeException($msg));
80
            return;
81
            // @codeCoverageIgnoreEnd
82
        }
83 9
        $this->added[(string)$ch] = $ch;
84 9
        $deferred && $this->deferreds[(string)$ch] = $deferred;
85 9
    }
86
87
    /**
88
     * Add delay.
89
     * @param int      $time
90
     * @param Deferred $deferred
91
     */
92 3
    public function addDelay($time, Deferred $deferred)
93 3
    {
94 3
        $time = filter_var($time, FILTER_VALIDATE_FLOAT);
95 3
        if ($time === false || $time < 0) {
96 1
            throw new \InvalidArgumentException('Delay must be positive number.');
97
        }
98 2
        $now = microtime(true);
99 2
        $until = $now + $time;
100 2
        $diff = $until - $now;
101 2
        if ($diff <= 0.0) {
102
            // @codeCoverageIgnoreStart
103
            $deferred->resolve(null);
104
            return;
105
            // @codeCoverageIgnoreEnd
106
        }
107
        do {
108 2
            $id = uniqid();
109 2
        } while (isset($this->untils[$id]));
110 2
        $this->untils[$id] = $until;
111 2
        $this->deferreds[$id] = $deferred;
112 2
    }
113
114
    /**
115
     * Run curl_multi_exec() loop.
116
     */
117 11
    public function wait()
118 11
    {
119 11
        curl_multi_exec($this->mh, $active); // Start requests.
120
        do {
121 11
            if ($this->added || $this->queue) {
122
                // if cURL handle is running, use curl_multi_select()
123 6
                curl_multi_select($this->mh, $this->options['interval']) < 0
124 6
                && usleep($this->options['interval'] * 1000000);
125
            } else {
126
                // otherwise, just sleep until nearest time
127 6
                $this->sleepUntilNearestTime();
128
            }
129 11
            curl_multi_exec($this->mh, $active);
130 11
            $this->consumeCurlsAndUntils();
131 11
        } while ($this->added || $this->queue || $this->untils);
132
        // All request must be done when reached here.
133 11
        if ($active) {
134
            // @codeCoverageIgnoreStart
135
            throw new \LogicException('Unreachable statement.');
136
            // @codeCoverageIgnoreEnd
137
        }
138 11
    }
139
140
    /**
141
     * Sleep at least required.
142
     */
143 6
    private function sleepUntilNearestTime()
144 6
    {
145 6
        $now = microtime(true);
146 6
        $min = null;
147 6
        foreach ($this->untils as $id => $until) {
148 2
            $diff = $now - $until;
149 2
            if ($diff < 0) {
150 2
                continue;
151
            }
152 1
            if ($min === null || $diff < $min) {
153 1
                $min = $diff;
154
            }
155
        }
156 6
        if ($min !== null) {
157 1
            usleep($min * 1000000);
158
        }
159 6
    }
160
161
    /**
162
     * Consume completed cURL handles and delays.
163
     */
164 11
    private function consumeCurlsAndUntils()
165 11
    {
166 11
        $entries = [];
167
        // First, we have to poll completed entries
168
        // DO NOT call curl_multi_add_handle() until polling done
169 11
        while ($entry = curl_multi_info_read($this->mh)) {
170 6
            $entries[] = $entry;
171
        }
172
        // Remove entry from multi handle to consume queue if available
173 11
        foreach ($entries as $entry) {
174 6
            curl_multi_remove_handle($this->mh, $entry['handle']);
175 6
            unset($this->added[(string)$entry['handle']]);
176 6
            if ($this->queue) {
177 1
                $ch = array_shift($this->queue);
178 6
                $this->addOrEnqueue($ch);
179
            }
180
        }
181
        // Now we check specified delay time elapsed
182 11
        foreach ($this->untils as $id => $until) {
183 2
            $diff = $until - microtime(true);
184 2
            if ($diff <= 0.0 && isset($this->deferreds[$id])) {
185 2
                $deferred = $this->deferreds[$id];
186 2
                unset($this->deferreds[$id], $this->untils[$id]);
187 2
                $deferred->resolve(null);
188
            }
189
        }
190
        // Finally, resolve cURL responses
191 11
        foreach ($entries as $entry) {
192 6
            $r = $entry['result'] === CURLE_OK
193 6
                ? curl_multi_getcontent($entry['handle'])
194 6
                : new CURLException(curl_error($entry['handle']), $entry['result'], $entry['handle']);
195 6
            if (isset($this->deferreds[(string)$entry['handle']])) {
196 6
                $deferred = $this->deferreds[(string)$entry['handle']];
197 6
                unset($this->deferreds[(string)$entry['handle']]);
198 6
                $r instanceof CURLException ? $deferred->reject($r) : $deferred->resolve($r);
199
            }
200
        }
201 11
    }
202
}
203