Completed
Push — master ( 43ce2c...b090fb )
by Ryosuke
03:28
created

CURLPool::sleepUntilNearestTime()   B

Complexity

Conditions 6
Paths 6

Size

Total Lines 16
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 13
CRAP Score 6

Importance

Changes 2
Bugs 0 Features 1
Metric Value
c 2
b 0
f 1
dl 0
loc 16
ccs 13
cts 13
cp 1
rs 8.8571
cc 6
eloc 11
nc 6
nop 0
crap 6
1
<?php
2
3
namespace mpyw\Co\Internal;
4
use mpyw\Co\Co;
5
use mpyw\Co\Internal\CoOption;
6
use mpyw\Co\CURLException;
7
use mpyw\RuntimePromise\Deferred;
8
9
class CURLPool
10
{
11
    /**
12
     * Options.
13
     * @var CoOption
14
     */
15
    private $options;
16
17
    /**
18
     * cURL multi handle.
19
     * @var resource
20
     */
21
    private $mh;
22
23
    /**
24
     * cURL handles those have not been dispatched.
25
     * @var array
26
     */
27
    private $queue = [];
28
29
    /**
30
     * cURL handles those have been already dispatched.
31
     * @var array
32
     */
33
    private $added = [];
34
35
    /**
36
     * Delays to be ended at.
37
     * @var array
38
     */
39
    private $untils = [];
40
41
    /**
42
     * React Deferreds.
43
     * @var Deferred
44
     */
45
    private $deferreds = [];
46
47
    /**
48
     * Used for halting loop.
49
     * @var \RuntimeException
50
     */
51
    private $haltException;
52
53
    /**
54
     * Constructor.
55
     * Initialize cURL multi handle.
56
     * @param CoOption $options
57
     */
58 19
    public function __construct(CoOption $options)
59 19
    {
60 19
        $this->options = $options;
61 19
        $this->mh = curl_multi_init();
62 19
        $flags = (int)$options['pipeline'] + (int)$options['multiplex'] * 2;
63 19
        curl_multi_setopt($this->mh, CURLMOPT_PIPELINING, $flags);
64 19
    }
65
66
    /**
67
     * Call curl_multi_add_handle() or push into queue.
68
     * @param resource $ch
69
     * @param Deferred $deferred
70
     */
71 9
    public function addOrEnqueue($ch, Deferred $deferred = null)
72 9
    {
73 9
        if (isset($this->added[(string)$ch]) || isset($this->queue[(string)$ch])) {
74 3
            throw new \InvalidArgumentException("The cURL handle is already enqueued: $ch");
75
        }
76 9
        if (count($this->added) >= $this->options['concurrency']) {
77 2
            $this->queue[(string)$ch] = $ch;
78 2
            $deferred && $this->deferreds[(string)$ch] = $deferred;
79 2
            return;
80
        }
81 9
        $errno = curl_multi_add_handle($this->mh, $ch);
82 9
        if ($errno !== CURLM_OK) {
83
            // @codeCoverageIgnoreStart
84
            $msg = curl_multi_strerror($errno) . ": $ch";
85
            $deferred && $deferred->reject(new \RuntimeException($msg));
86
            return;
87
            // @codeCoverageIgnoreEnd
88
        }
89 9
        $this->added[(string)$ch] = $ch;
90 9
        $deferred && $this->deferreds[(string)$ch] = $deferred;
91 9
    }
92
93
    /**
94
     * Add delay.
95
     * @param int      $time
96
     * @param Deferred $deferred
97
     */
98 4
    public function addDelay($time, Deferred $deferred)
99 4
    {
100 4
        $time = filter_var($time, FILTER_VALIDATE_FLOAT);
101 4
        if ($time === false || $time < 0) {
102 1
            throw new \InvalidArgumentException('Delay must be positive number.');
103
        }
104
        do {
105 3
            $id = uniqid();
106 3
        } while (isset($this->untils[$id]));
107 3
        $this->untils[$id] = microtime(true) + $time;
108 3
        $this->deferreds[$id] = $deferred;
109 3
    }
110
111
    /**
112
     * Run curl_multi_exec() loop.
113
     */
114 13
    public function wait()
115 13
    {
116 13
        curl_multi_exec($this->mh, $active); // Start requests.
117
        do {
118
            // if cURL handle is running, use curl_multi_select()
119
            // otherwise, just sleep until nearest time
120 13
            $this->added || $this->queue
121 6
                ? curl_multi_select($this->mh, $this->options['interval']) < 0
122 6
                  && usleep($this->options['interval'] * 1000000)
123 8
                : $this->sleepUntilNearestTime();
124 13
            curl_multi_exec($this->mh, $active);
125 13
            $this->consumeCurlsAndUntils();
126 13
        } while (!$this->haltException && ($this->added || $this->queue || $this->untils));
127 13
        if ($this->haltException) {
128 3
            throw $this->haltException;
129
        }
130 11
    }
131
132
    /**
133
     * Used for halting loop.
134
     */
135 3
    public function reserveHaltException($e)
136 3
    {
137 3
        $this->haltException = $e;
138 3
    }
139
140
    /**
141
     * Sleep at least required.
142
     */
143 8
    private function sleepUntilNearestTime()
144 8
    {
145 8
        $now = microtime(true);
146 8
        $min = null;
147 8
        foreach ($this->untils as $id => $until) {
148 3
            $diff = $now - $until;
149 3
            if ($diff < 0 || $min !== null && $diff >= $min) {
150 3
                continue;
151
            }
152 2
            $min = $diff;
153
        }
154 8
        if ($min !== null) {
155 2
            return;
156
        }
157 8
        usleep($min * 1000000);
158 8
    }
159
160
    /**
161
     * Consume completed cURL handles and delays.
162
     */
163 13
    private function consumeCurlsAndUntils()
164 13
    {
165 13
        $entries = [];
166
        // First, we have to poll completed entries
167
        // DO NOT call curl_multi_add_handle() until polling done
168 13
        while ($entry = curl_multi_info_read($this->mh)) {
169 6
            $entries[] = $entry;
170
        }
171
        // Remove entry from multi handle to consume queue if available
172 13
        foreach ($entries as $entry) {
173 6
            curl_multi_remove_handle($this->mh, $entry['handle']);
174 6
            unset($this->added[(string)$entry['handle']]);
175 6
            if (!$this->queue) {
1 ignored issue
show
Bug Best Practice introduced by
The expression $this->queue of type array is implicitly converted to a boolean; are you sure this is intended? If so, consider using empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
176 6
                continue;
177
            }
178 1
            $this->addOrEnqueue(array_shift($this->queue));
179
        }
180
        // Now we check specified delay time elapsed
181 13
        foreach ($this->untils as $id => $until) {
182 3
            $diff = $until - microtime(true);
183 3
            if ($diff > 0.0 || !isset($this->deferreds[$id])) {
184 3
                continue;
185
            }
186 2
            $deferred = $this->deferreds[$id];
187 2
            unset($this->deferreds[$id], $this->untils[$id]);
188 2
            $deferred->resolve(null);
189
        }
190
        // Finally, resolve cURL responses
191 13
        foreach ($entries as $entry) {
192 6
            if (!isset($this->deferreds[(string)$entry['handle']])) {
193
                continue;
194
            }
195 6
            $r = $entry['result'] === CURLE_OK
196 6
                ? curl_multi_getcontent($entry['handle'])
197 6
                : new CURLException(curl_error($entry['handle']), $entry['result'], $entry['handle']);
198 6
            $deferred = $this->deferreds[(string)$entry['handle']];
199 6
            unset($this->deferreds[(string)$entry['handle']]);
200 6
            $r instanceof CURLException ? $deferred->reject($r) : $deferred->resolve($r);
201
        }
202 13
    }
203
}
204