Completed
Push — master ( a71924...e2ef70 )
by Ryosuke
02:54
created

CURLPool   B

Complexity

Total Complexity 44

Size/Duplication

Total Lines 246
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 3

Test Coverage

Coverage 100%

Importance

Changes 21
Bugs 5 Features 2
Metric Value
wmc 44
c 21
b 5
f 2
lcom 1
cbo 3
dl 0
loc 246
ccs 108
cts 108
cp 1
rs 8.3396

12 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 8 1
A addOrEnqueue() 0 9 4
A addCurl() 0 14 4
A enqueueCurl() 0 5 2
A addDelay() 0 15 4
B wait() 0 17 9
A reserveHaltException() 0 4 1
B sleepUntilNearestTime() 0 18 6
A consumeCurlsAndUntils() 0 6 1
A consumeCurls() 0 15 4
A consumeUntils() 0 12 4
A resolveCurls() 0 13 4

How to fix   Complexity   

Complex Class

Complex classes like CURLPool often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes. You can also have a look at the cohesion graph to spot any un-connected, or weakly-connected components.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use CURLPool, and based on these observations, apply Extract Interface, too.

1
<?php
2
3
namespace mpyw\Co\Internal;
4
use mpyw\Co\Co;
5
use mpyw\Co\Internal\CoOption;
6
use mpyw\Co\CURLException;
7
use mpyw\RuntimePromise\Deferred;
8
9
class CURLPool
10
{
11
    /**
12
     * Options.
13
     * @var CoOption
14
     */
15
    private $options;
16
17
    /**
18
     * cURL multi handle.
19
     * @var resource
20
     */
21
    private $mh;
22
23
    /**
24
     * cURL handles those have not been dispatched.
25
     * @var array
26
     */
27
    private $queue = [];
28
29
    /**
30
     * cURL handles those have been already dispatched.
31
     * @var array
32
     */
33
    private $added = [];
34
35
    /**
36
     * Delays to be ended at.
37
     * @var array
38
     */
39
    private $untils = [];
40
41
    /**
42
     * React Deferreds.
43
     * @var Deferred
44
     */
45
    private $deferreds = [];
46
47
    /**
48
     * Used for halting loop.
49
     * @var \RuntimeException
50
     */
51
    private $haltException;
52
53
    /**
54
     * TCP connection counter.
55
     * @var ConnectionCounter
56
     */
57
    private $counter;
58
59
    /**
60
     * Constructor.
61
     * Initialize cURL multi handle.
62
     * @param CoOption $options
63
     */
64 26
    public function __construct(CoOption $options)
65 26
    {
66 26
        $this->options = $options;
67 26
        $this->counter = new ConnectionCounter($options);
68 26
        $this->mh = curl_multi_init();
69 26
        $flags = (int)$options['pipeline'] + (int)$options['multiplex'] * 2;
70 26
        curl_multi_setopt($this->mh, CURLMOPT_PIPELINING, $flags);
71 26
    }
72
73
    /**
74
     * Call curl_multi_add_handle() or push into queue.
75
     * @param resource $ch
76
     * @param Deferred $deferred
77
     */
78 13
    public function addOrEnqueue($ch, Deferred $deferred = null)
79 13
    {
80 13
        if (isset($this->added[(string)$ch]) || isset($this->queue[(string)$ch])) {
81 3
            throw new \UnexpectedValueException("The cURL handle is already enqueued: $ch");
82
        }
83 13
        $this->counter->isPoolFilled($ch)
84 3
            ? $this->enqueueCurl($ch, $deferred)
85 13
            : $this->addCurl($ch, $deferred);
86 13
    }
87
88
    /**
89
     * Call curl_multi_add_handle().
90
     * @param resource $ch
91
     * @param Deferred $deferred
92
     */
93 13
    private function addCurl($ch, Deferred $deferred = null)
94 13
    {
95 13
        $errno = curl_multi_add_handle($this->mh, $ch);
96 13
        if ($errno !== CURLM_OK) {
97
            // @codeCoverageIgnoreStart
98
            $msg = curl_multi_strerror($errno) . ": $ch";
99
            $deferred && $deferred->reject(new \RuntimeException($msg));
100
            return;
101
            // @codeCoverageIgnoreEnd
102
        }
103 13
        $this->added[(string)$ch] = $ch;
104 13
        $this->counter->addDestination($ch);
105 13
        $deferred && $this->deferreds[(string)$ch] = $deferred;
106 13
    }
107
108
    /**
109
     * Push into queue.
110
     * @param resource $ch
111
     * @param Deferred $deferred
112
     */
113 3
    private function enqueueCurl($ch, Deferred $deferred = null)
114 3
    {
115 3
        $this->queue[(string)$ch] = $ch;
116 3
        $deferred && $this->deferreds[(string)$ch] = $deferred;
117 3
    }
118
119
    /**
120
     * Add delay.
121
     * @param int      $time
122
     * @param Deferred $deferred
123
     */
124 6
    public function addDelay($time, Deferred $deferred)
125 6
    {
126 6
        $time = filter_var($time, FILTER_VALIDATE_FLOAT);
127 6
        if ($time === false) {
128 1
            throw new \InvalidArgumentException('Delay must be number.');
129
        }
130 5
        if ($time < 0) {
131 1
            throw new \DomainException('Delay must be positive.');
132
        }
133
        do {
134 4
            $id = uniqid();
135 4
        } while (isset($this->untils[$id]));
136 4
        $this->untils[$id] = microtime(true) + $time;
137 4
        $this->deferreds[$id] = $deferred;
138 4
    }
139
140
    /**
141
     * Run curl_multi_exec() loop.
142
     */
143 18
    public function wait()
144 18
    {
145 18
        curl_multi_exec($this->mh, $active); // Start requests.
146
        do {
147
            // if cURL handle is running, use curl_multi_select()
148
            // otherwise, just sleep until nearest time
149 18
            $this->added || $this->queue
150 10
                ? curl_multi_select($this->mh, $this->options['interval']) < 0
151 10
                  && usleep($this->options['interval'] * 1000000)
152 9
                : $this->sleepUntilNearestTime();
153 18
            curl_multi_exec($this->mh, $active);
154 18
            $this->consumeCurlsAndUntils();
155 18
        } while (!$this->haltException && ($this->added || $this->queue || $this->untils));
156 18
        if ($this->haltException) {
157 4
            throw $this->haltException;
158
        }
159 15
    }
160
161
    /**
162
     * Used for halting loop.
163
     */
164 4
    public function reserveHaltException($e)
165 4
    {
166 4
        $this->haltException = $e;
167 4
    }
168
169
    /**
170
     * Sleep at least required.
171
     */
172 9
    private function sleepUntilNearestTime()
173 9
    {
174 9
        $now = microtime(true);
175 9
        $min = null;
176 9
        foreach ($this->untils as $id => $until) {
177 4
            $diff = $until - $now;
178 4
            if ($diff < 0) {
179
                // @codeCoverageIgnoreStart
180
                return;
181
                // @codeCoverageIgnoreEnd
182
            }
183 4
            if ($min !== null && $diff >= $min) {
184 2
                continue;
185
            }
186 4
            $min = $diff;
187
        }
188 9
        $min && usleep($min * 1000000);
189 9
    }
190
191
    /**
192
     * Consume completed cURL handles and delays.
193
     */
194 18
    private function consumeCurlsAndUntils()
195 18
    {
196 18
        $entries = $this->consumeCurls();
197 18
        $this->consumeUntils();
198 18
        $this->resolveCurls($entries);
199 18
    }
200
201
    /**
202
     * Poll completed cURL entries and consume cURL queue.
203
     * @return array
204
     */
205 18
    private function consumeCurls()
206 18
    {
207 18
        $entries = [];
208
        // DO NOT call curl_multi_add_handle() until polling done
209 18
        while ($entry = curl_multi_info_read($this->mh)) {
210 10
            $entries[] = $entry;
211
        }
212 18
        foreach ($entries as $entry) {
213 10
            curl_multi_remove_handle($this->mh, $entry['handle']);
214 10
            unset($this->added[(string)$entry['handle']]);
215 10
            $this->counter->removeDestination($entry['handle']);
216 10
            $this->queue && $this->addOrEnqueue(array_shift($this->queue));
217
        }
218 18
        return $entries;
219
    }
220
221
    /**
222
     * Consume delay queue.
223
     */
224 18
    private function consumeUntils()
225 18
    {
226 18
        foreach ($this->untils as $id => $until) {
227 4
            $diff = $until - microtime(true);
228 4
            if ($diff > 0.0 || !isset($this->deferreds[$id])) {
229 2
                continue;
230
            }
231 4
            $deferred = $this->deferreds[$id];
232 4
            unset($this->deferreds[$id], $this->untils[$id]);
233 4
            $deferred->resolve(null);
234
        }
235 18
    }
236
237
    /**
238
     * Resolve polled cURLs.
239
     * @param  array $entries Polled cURL entries.
240
     */
241 18
    private function resolveCurls($entries)
242 18
    {
243 18
        foreach ($entries as $entry) {
244 10
            if (!isset($this->deferreds[(string)$entry['handle']])) {
245 1
                continue;
246
            }
247 9
            $deferred = $this->deferreds[(string)$entry['handle']];
248 9
            unset($this->deferreds[(string)$entry['handle']]);
249 9
            $entry['result'] === CURLE_OK
250 9
                ? $deferred->resolve(curl_multi_getcontent($entry['handle']))
251 9
                : $deferred->reject(new CURLException(curl_error($entry['handle']), $entry['result'], $entry['handle']));
252
        }
253 18
    }
254
}
255