Completed
Push — master ( b090fb...0d13cf )
by Ryosuke
03:33
created

CURLPool::consumeCurlsAndUntils()   C

Complexity

Conditions 11
Paths 108

Size

Total Lines 40
Code Lines 26

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 28
CRAP Score 11

Importance

Changes 2
Bugs 0 Features 1
Metric Value
c 2
b 0
f 1
dl 0
loc 40
ccs 28
cts 28
cp 1
rs 5.1372
cc 11
eloc 26
nc 108
nop 0
crap 11

How to fix   Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
namespace mpyw\Co\Internal;
4
use mpyw\Co\Co;
5
use mpyw\Co\Internal\CoOption;
6
use mpyw\Co\CURLException;
7
use mpyw\RuntimePromise\Deferred;
8
9
class CURLPool
10
{
11
    /**
12
     * Options.
13
     * @var CoOption
14
     */
15
    private $options;
16
17
    /**
18
     * cURL multi handle.
19
     * @var resource
20
     */
21
    private $mh;
22
23
    /**
24
     * cURL handles those have not been dispatched.
25
     * @var array
26
     */
27
    private $queue = [];
28
29
    /**
30
     * cURL handles those have been already dispatched.
31
     * @var array
32
     */
33
    private $added = [];
34
35
    /**
36
     * Delays to be ended at.
37
     * @var array
38
     */
39
    private $untils = [];
40
41
    /**
42
     * React Deferreds.
43
     * @var Deferred
44
     */
45
    private $deferreds = [];
46
47
    /**
48
     * Used for halting loop.
49
     * @var \RuntimeException
50
     */
51
    private $haltException;
52
53
    /**
54
     * Constructor.
55
     * Initialize cURL multi handle.
56
     * @param CoOption $options
57
     */
58 20
    public function __construct(CoOption $options)
59 20
    {
60 20
        $this->options = $options;
61 20
        $this->mh = curl_multi_init();
62 20
        $flags = (int)$options['pipeline'] + (int)$options['multiplex'] * 2;
63 20
        curl_multi_setopt($this->mh, CURLMOPT_PIPELINING, $flags);
64 20
    }
65
66
    /**
67
     * Call curl_multi_add_handle() or push into queue.
68
     * @param resource $ch
69
     * @param Deferred $deferred
70
     */
71 10
    public function addOrEnqueue($ch, Deferred $deferred = null)
72 10
    {
73 10
        if (isset($this->added[(string)$ch]) || isset($this->queue[(string)$ch])) {
74 3
            throw new \InvalidArgumentException("The cURL handle is already enqueued: $ch");
75
        }
76 10
        if (count($this->added) >= $this->options['concurrency']) {
77 2
            $this->queue[(string)$ch] = $ch;
78 2
            $deferred && $this->deferreds[(string)$ch] = $deferred;
79 2
            return;
80
        }
81 10
        $errno = curl_multi_add_handle($this->mh, $ch);
82 10
        if ($errno !== CURLM_OK) {
83
            // @codeCoverageIgnoreStart
84
            $msg = curl_multi_strerror($errno) . ": $ch";
85
            $deferred && $deferred->reject(new \RuntimeException($msg));
86
            return;
87
            // @codeCoverageIgnoreEnd
88
        }
89 10
        $this->added[(string)$ch] = $ch;
90 10
        $deferred && $this->deferreds[(string)$ch] = $deferred;
91 10
    }
92
93
    /**
94
     * Add delay.
95
     * @param int      $time
96
     * @param Deferred $deferred
97
     */
98 4
    public function addDelay($time, Deferred $deferred)
99 4
    {
100 4
        $time = filter_var($time, FILTER_VALIDATE_FLOAT);
101 4
        if ($time === false || $time < 0) {
102 1
            throw new \InvalidArgumentException('Delay must be positive number.');
103
        }
104
        do {
105 3
            $id = uniqid();
106 3
        } while (isset($this->untils[$id]));
107 3
        $this->untils[$id] = microtime(true) + $time;
108 3
        $this->deferreds[$id] = $deferred;
109 3
    }
110
111
    /**
112
     * Run curl_multi_exec() loop.
113
     */
114 14
    public function wait()
115 14
    {
116 14
        curl_multi_exec($this->mh, $active); // Start requests.
117
        do {
118
            // if cURL handle is running, use curl_multi_select()
119
            // otherwise, just sleep until nearest time
120 14
            $this->added || $this->queue
121 7
                ? curl_multi_select($this->mh, $this->options['interval']) < 0
122 7
                  && usleep($this->options['interval'] * 1000000)
123 8
                : $this->sleepUntilNearestTime();
124 14
            curl_multi_exec($this->mh, $active);
125 14
            $this->consumeCurlsAndUntils();
126 14
        } while (!$this->haltException && ($this->added || $this->queue || $this->untils));
127 14
        if ($this->haltException) {
128 3
            throw $this->haltException;
129
        }
130 12
    }
131
132
    /**
133
     * Used for halting loop.
134
     */
135 3
    public function reserveHaltException($e)
136 3
    {
137 3
        $this->haltException = $e;
138 3
    }
139
140
    /**
141
     * Sleep at least required.
142
     */
143 8
    private function sleepUntilNearestTime()
144 8
    {
145 8
        $now = microtime(true);
146 8
        $min = null;
147 8
        foreach ($this->untils as $id => $until) {
148 3
            $diff = $now - $until;
149 3
            if ($diff < 0 || $min !== null && $diff >= $min) {
150 3
                continue;
151
            }
152 1
            $min = $diff;
153
        }
154 8
        if ($min !== null) {
155 1
            return;
156
        }
157 8
        usleep($min * 1000000);
158 8
    }
159
160
    /**
161
     * Consume completed cURL handles and delays.
162
     */
163 14
    private function consumeCurlsAndUntils()
164 14
    {
165 14
        $entries = [];
166
        // First, we have to poll completed entries
167
        // DO NOT call curl_multi_add_handle() until polling done
168 14
        while ($entry = curl_multi_info_read($this->mh)) {
169 7
            $entries[] = $entry;
170
        }
171
        // Remove entry from multi handle to consume queue if available
172 14
        foreach ($entries as $entry) {
173 7
            curl_multi_remove_handle($this->mh, $entry['handle']);
174 7
            unset($this->added[(string)$entry['handle']]);
175 7
            if (!$this->queue) {
176 7
                continue;
177
            }
178 1
            $this->addOrEnqueue(array_shift($this->queue));
179
        }
180
        // Now we check specified delay time elapsed
181 14
        foreach ($this->untils as $id => $until) {
182 3
            $diff = $until - microtime(true);
183 3
            if ($diff > 0.0 || !isset($this->deferreds[$id])) {
184 3
                continue;
185
            }
186 2
            $deferred = $this->deferreds[$id];
187 2
            unset($this->deferreds[$id], $this->untils[$id]);
188 2
            $deferred->resolve(null);
189
        }
190
        // Finally, resolve cURL responses
191 14
        foreach ($entries as $entry) {
192 7
            if (!isset($this->deferreds[(string)$entry['handle']])) {
193 1
                continue;
194
            }
195 6
            $r = $entry['result'] === CURLE_OK
196 6
                ? curl_multi_getcontent($entry['handle'])
197 6
                : new CURLException(curl_error($entry['handle']), $entry['result'], $entry['handle']);
198 6
            $deferred = $this->deferreds[(string)$entry['handle']];
199 6
            unset($this->deferreds[(string)$entry['handle']]);
200 6
            $r instanceof CURLException ? $deferred->reject($r) : $deferred->resolve($r);
201
        }
202 14
    }
203
}
204