Completed
Push — master ( ed5639...43ce2c )
by Ryosuke
03:05
created

CURLPool::consumeCurlsAndUntils()   D

Complexity

Conditions 11
Paths 126

Size

Total Lines 38
Code Lines 24

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 26
CRAP Score 11

Importance

Changes 1
Bugs 0 Features 1
Metric Value
c 1
b 0
f 1
dl 0
loc 38
ccs 26
cts 26
cp 1
rs 4.9629
cc 11
eloc 24
nc 126
nop 0
crap 11

How to fix   Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
namespace mpyw\Co\Internal;
4
use mpyw\Co\Co;
5
use mpyw\Co\Internal\CoOption;
6
use mpyw\Co\CURLException;
7
use mpyw\RuntimePromise\Deferred;
8
9
class CURLPool
10
{
11
    /**
12
     * Options.
13
     * @var CoOption
14
     */
15
    private $options;
16
17
    /**
18
     * cURL multi handle.
19
     * @var resource
20
     */
21
    private $mh;
22
23
    /**
24
     * cURL handles those have not been dispatched.
25
     * @var array
26
     */
27
    private $queue = [];
28
29
    /**
30
     * cURL handles those have been already dispatched.
31
     * @var array
32
     */
33
    private $added = [];
34
35
    /**
36
     * Delays to be ended at.
37
     * @var array
38
     */
39
    private $untils = [];
40
41
    /**
42
     * React Deferreds.
43
     * @var Deferred
44
     */
45
    private $deferreds = [];
46
47
    /**
48
     * Used for halting loop.
49
     * @var \RuntimeException
50
     */
51
    private $haltException;
52
53
    /**
54
     * Constructor.
55
     * Initialize cURL multi handle.
56
     * @param CoOption $options
57
     */
58 19
    public function __construct(CoOption $options)
59 19
    {
60 19
        $this->options = $options;
61 19
        $this->mh = curl_multi_init();
62 19
        $flags = (int)$options['pipeline'] + (int)$options['multiplex'] * 2;
63 19
        curl_multi_setopt($this->mh, CURLMOPT_PIPELINING, $flags);
64 19
    }
65
66
    /**
67
     * Call curl_multi_add_handle() or push into queue.
68
     * @param resource $ch
69
     * @param Deferred $deferred
70
     */
71 9
    public function addOrEnqueue($ch, Deferred $deferred = null)
72 9
    {
73 9
        if (isset($this->added[(string)$ch]) || isset($this->queue[(string)$ch])) {
74 3
            throw new \InvalidArgumentException("The cURL handle is already enqueued: $ch");
75
        }
76 9
        if (count($this->added) >= $this->options['concurrency']) {
77 2
            $this->queue[(string)$ch] = $ch;
78 2
            $deferred && $this->deferreds[(string)$ch] = $deferred;
79 2
            return;
80
        }
81 9
        $errno = curl_multi_add_handle($this->mh, $ch);
82 9
        if ($errno !== CURLM_OK) {
83
            // @codeCoverageIgnoreStart
84
            $msg = curl_multi_strerror($errno) . ": $ch";
85
            $deferred && $deferred->reject(new \RuntimeException($msg));
86
            return;
87
            // @codeCoverageIgnoreEnd
88
        }
89 9
        $this->added[(string)$ch] = $ch;
90 9
        $deferred && $this->deferreds[(string)$ch] = $deferred;
91 9
    }
92
93
    /**
94
     * Add delay.
95
     * @param int      $time
96
     * @param Deferred $deferred
97
     */
98 4
    public function addDelay($time, Deferred $deferred)
99 4
    {
100 4
        $time = filter_var($time, FILTER_VALIDATE_FLOAT);
101 4
        if ($time === false || $time < 0) {
102 1
            throw new \InvalidArgumentException('Delay must be positive number.');
103
        }
104 3
        $now = microtime(true);
105 3
        $until = $now + $time;
106 3
        $diff = $until - $now;
107 3
        if ($diff <= 0.0) {
108
            // @codeCoverageIgnoreStart
109
            $deferred->resolve(null);
110
            return;
111
            // @codeCoverageIgnoreEnd
112
        }
113
        do {
114 3
            $id = uniqid();
115 3
        } while (isset($this->untils[$id]));
116 3
        $this->untils[$id] = $until;
117 3
        $this->deferreds[$id] = $deferred;
118 3
    }
119
120
    /**
121
     * Run curl_multi_exec() loop.
122
     */
123 13
    public function wait()
124 13
    {
125 13
        $this->halt = false;
0 ignored issues
show
Bug introduced by
The property halt does not exist. Did you maybe forget to declare it?

In PHP it is possible to write to properties without declaring them. For example, the following is perfectly valid PHP code:

class MyClass { }

$x = new MyClass();
$x->foo = true;

Generally, it is a good practice to explictly declare properties to avoid accidental typos and provide IDE auto-completion:

class MyClass {
    public $foo;
}

$x = new MyClass();
$x->foo = true;
Loading history...
126 13
        curl_multi_exec($this->mh, $active); // Start requests.
127
        do {
128 13
            if ($this->added || $this->queue) {
129
                // if cURL handle is running, use curl_multi_select()
130 6
                curl_multi_select($this->mh, $this->options['interval']) < 0
131 6
                && usleep($this->options['interval'] * 1000000);
132
            } else {
133
                // otherwise, just sleep until nearest time
134 8
                $this->sleepUntilNearestTime();
135
            }
136 13
            curl_multi_exec($this->mh, $active);
137 13
            $this->consumeCurlsAndUntils();
138 13
            if ($this->haltException) {
139 3
                throw $this->haltException;
140
            }
141 11
        } while ($this->added || $this->queue || $this->untils);
142 11
    }
143
144
    /**
145
     * Used for halting loop.
146
     */
147 3
    public function reserveHaltException($e)
148 3
    {
149 3
        $this->haltException = $e;
150 3
    }
151
152
    /**
153
     * Sleep at least required.
154
     */
155 8
    private function sleepUntilNearestTime()
156 8
    {
157 8
        $now = microtime(true);
158 8
        $min = null;
159 8
        foreach ($this->untils as $id => $until) {
160 3
            $diff = $now - $until;
161 3
            if ($diff < 0) {
162 3
                continue;
163
            }
164 1
            if ($min === null || $diff < $min) {
165 1
                $min = $diff;
166
            }
167
        }
168 8
        if ($min !== null) {
169 1
            usleep($min * 1000000);
170
        }
171 8
    }
172
173
    /**
174
     * Consume completed cURL handles and delays.
175
     */
176 13
    private function consumeCurlsAndUntils()
177 13
    {
178 13
        $entries = [];
179
        // First, we have to poll completed entries
180
        // DO NOT call curl_multi_add_handle() until polling done
181 13
        while ($entry = curl_multi_info_read($this->mh)) {
182 6
            $entries[] = $entry;
183
        }
184
        // Remove entry from multi handle to consume queue if available
185 13
        foreach ($entries as $entry) {
186 6
            curl_multi_remove_handle($this->mh, $entry['handle']);
187 6
            unset($this->added[(string)$entry['handle']]);
188 6
            if ($this->queue) {
189 1
                $ch = array_shift($this->queue);
190 6
                $this->addOrEnqueue($ch);
191
            }
192
        }
193
        // Now we check specified delay time elapsed
194 13
        foreach ($this->untils as $id => $until) {
195 3
            $diff = $until - microtime(true);
196 3
            if ($diff <= 0.0 && isset($this->deferreds[$id])) {
197 2
                $deferred = $this->deferreds[$id];
198 2
                unset($this->deferreds[$id], $this->untils[$id]);
199 3
                $deferred->resolve(null);
200
            }
201
        }
202
        // Finally, resolve cURL responses
203 13
        foreach ($entries as $entry) {
204 6
            $r = $entry['result'] === CURLE_OK
205 6
                ? curl_multi_getcontent($entry['handle'])
206 6
                : new CURLException(curl_error($entry['handle']), $entry['result'], $entry['handle']);
207 6
            if (isset($this->deferreds[(string)$entry['handle']])) {
208 6
                $deferred = $this->deferreds[(string)$entry['handle']];
209 6
                unset($this->deferreds[(string)$entry['handle']]);
210 6
                $r instanceof CURLException ? $deferred->reject($r) : $deferred->resolve($r);
211
            }
212
        }
213 13
    }
214
}
215