Passed
Push — master ( 4e99ce...4c2c3a )
by Igor
05:31
created

CurlerRolling   B

Complexity

Total Complexity 44

Size/Duplication

Total Lines 381
Duplicated Lines 0 %

Test Coverage

Coverage 79.53%

Importance

Changes 0
Metric Value
dl 0
loc 381
ccs 101
cts 127
cp 0.7953
rs 8.3396
c 0
b 0
f 0
wmc 44

18 Methods

Rating   Name   Duplication   Size   Complexity  
A execOne() 0 12 2
A getRunningRequests() 0 3 1
A close() 0 4 2
A execLoopWait() 0 20 3
B exec() 0 47 5
A handlerMulti() 0 11 3
A _prepareLoopQue() 0 12 1
A makeResponse() 0 16 1
A countActive() 0 3 1
A __destructor() 0 3 1
A countPending() 0 3 1
A setSimultaneousLimit() 0 8 3
A countCompleted() 0 3 1
A getSimultaneousLimit() 0 3 1
A getInfo() 0 3 1
B addQueLoop() 0 18 5
A parse_headers_from_curl_response() 0 18 4
C makePendingRequestsQue() 0 35 8

How to fix   Complexity   

Complex Class

Complex classes like CurlerRolling often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use CurlerRolling, and based on these observations, apply Extract Interface, too.

1
<?php
2
3
namespace ClickHouseDB\Transport;
4
5
use ClickHouseDB\Exception\TransportException;
6
7
class CurlerRolling
8
{
9
    /**
10
     * @var int
11
     *
12
     * Max number of simultaneous requests.
13
     */
14
    private $simultaneousLimit = 10;
15
16
    /**
17
     * @var array
18
     *
19
     * Requests currently being processed by curl
20
     */
21
    private $activeRequests = [];
22
23
    /**
24
     * @var int
25
     */
26
    private $runningRequests = 0;
27
28
    /**
29
     * @var CurlerRequest[]
30
     *
31
     * Requests queued to be processed
32
     */
33
    private $pendingRequests = [];
34
35
    /**
36
     * @return int
37
     */
38
    private $completedRequestCount = 0;
39
40
    /**
41
     * @var null
42
     */
43
    private $_pool_master = null;
44
45
    /**
46
     * @var int
47
     */
48
    private $waitRequests = 0;
49
50
    /**
51
     * @var array
52
     */
53
    private $handleMapTasks = [];
54
55
    /**
56
     *
57
     */
58
    public function __destructor()
59
    {
60
        $this->close();
61
    }
62
63
64
    /**
65
     * @return resource
66
     */
67 8
    private function handlerMulti()
68
    {
69 8
        if (!$this->_pool_master) {
70 8
            $this->_pool_master = curl_multi_init();
0 ignored issues
show
Documentation Bug introduced by
It seems like curl_multi_init() of type resource is incompatible with the declared type null of property $_pool_master.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
71
72 8
            if (function_exists('curl_multi_setopt')) {
73 8
                curl_multi_setopt($this->_pool_master, CURLMOPT_MAXCONNECTS, $this->simultaneousLimit);
74
            }
75
        }
76
77 8
        return $this->_pool_master;
78
    }
79
80
    /**
81
     *
82
     */
83
    public function close()
84
    {
85
        if ($this->_pool_master) {
86
            curl_multi_close($this->handlerMulti());
87
        }
88
    }
89
90
91
    /**
92
     * @param CurlerRequest $req
93
     * @param bool $checkMultiAdd
94
     * @param bool $force
95
     * @return bool
96
     * @throws TransportException
97
     */
98 11
    public function addQueLoop(CurlerRequest $req, $checkMultiAdd = true, $force = false)
99
    {
100 11
        $id = $req->getId();
101
102 11
        if (!$id) {
103 11
            $id = $req->getUniqHash($this->completedRequestCount);
104
        }
105
106 11
        if (!$force && isset($this->pendingRequests[$id])) {
107
            if (!$checkMultiAdd) {
108
                return false;
109
            }
110
111
            throw new TransportException("Cant add exists que - cant overwrite : $id!\n");
112
        }
113
114 11
        $this->pendingRequests[$id] = $req;
115 11
        return true;
116
    }
117
118
    /**
119
     * @param resource $oneHandle
120
     * @return CurlerResponse
121
     */
122 34
    private function makeResponse($oneHandle)
123
    {
124 34
        $response = curl_multi_getcontent($oneHandle);
125 34
        $header_size = curl_getinfo($oneHandle, CURLINFO_HEADER_SIZE);
126 34
        $header = substr($response, 0, $header_size);
127 34
        $body = substr($response, $header_size);
128
129 34
        $n = new CurlerResponse();
130 34
        $n->_headers = $this->parse_headers_from_curl_response($header);
0 ignored issues
show
Bug introduced by
$header of type string is incompatible with the type resource expected by parameter $response of ClickHouseDB\Transport\C...rs_from_curl_response(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

130
        $n->_headers = $this->parse_headers_from_curl_response(/** @scrutinizer ignore-type */ $header);
Loading history...
131 34
        $n->_body = $body;
132 34
        $n->_info = curl_getinfo($oneHandle);
133 34
        $n->_error = curl_error($oneHandle);
134 34
        $n->_errorNo = curl_errno($oneHandle);
135 34
        $n->_useTime = 0;
136
137 34
        return $n;
138
    }
139
140
    /**
141
     * @param int $usleep
142
     * @return bool
143
     * @throws TransportException
144
     */
145 8
    public function execLoopWait($usleep = 10000)
146
    {
147
        // @todo rewrite wait
148 8
        $c = 0;
149
150
        // add all tasks
151
        do {
152 8
            $this->exec();
153
154 8
            $loop = $this->countActive();
155 8
            $c++;
156
157 8
            if ($c > 100000) {
158
                break;
159
            }
160
161 8
            usleep($usleep);
162 8
        } while ($loop);
163
164 8
        return true;
165
    }
166
167
    /**
168
     * @param resource $response
169
     * @return array
170
     */
171 34
    private function parse_headers_from_curl_response($response)
172
    {
173 34
        $headers = [];
174 34
        $header_text = $response;
175
176 34
        foreach (explode("\r\n", $header_text) as $i => $line) {
0 ignored issues
show
Bug introduced by
$header_text of type resource is incompatible with the type string expected by parameter $string of explode(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

176
        foreach (explode("\r\n", /** @scrutinizer ignore-type */ $header_text) as $i => $line) {
Loading history...
177 34
            if ($i === 0) {
178 34
                $headers['http_code'] = $line;
179
            }
180
            else {
181 34
                $r = explode(': ', $line);
182 34
                if (sizeof($r) == 2) {
183 34
                    $headers[$r[0]] = $r[1];
184
                }
185
            }
186
        }
187
188 34
        return $headers;
189
    }
190
191
    /**
192
     * @return int
193
     */
194 9
    public function countPending()
195
    {
196 9
        return sizeof($this->pendingRequests);
197
    }
198
199
    /**
200
     * @return int
201
     */
202 8
    public function countActive()
203
    {
204 8
        return count($this->activeRequests);
205
    }
206
207
    /**
208
     * @return int
209
     */
210
    public function countCompleted()
211
    {
212
        return $this->completedRequestCount;
213
    }
214
215
    /**
216
     * Set the limit for how many cURL requests will be execute simultaneously.
217
     *
218
     * Please be mindful that if you set this too high, requests are likely to fail
219
     * more frequently or automated software may perceive you as a DOS attack and
220
     * automatically block further requests.
221
     *
222
     * @param int $count
223
     * @throws \InvalidArgumentException
224
     * @return $this
225
     */
226
    public function setSimultaneousLimit($count)
227
    {
228
        if (!is_int($count) || $count < 2) {
0 ignored issues
show
introduced by
The condition is_int($count) is always true.
Loading history...
229
            throw new \InvalidArgumentException("setSimultaneousLimit count must be an int >= 2");
230
        }
231
232
        $this->simultaneousLimit = $count;
233
        return $this;
234
    }
235
236
    /**
237
     * @return int
238
     */
239 8
    public function getSimultaneousLimit()
240
    {
241 8
        return $this->simultaneousLimit;
242
    }
243
244
    /**
245
     * @return int
246
     */
247
    public function getRunningRequests()
248
    {
249
        return $this->runningRequests;
250
    }
251
252
    /**
253
     * @param CurlerRequest $request
254
     * @param bool $auto_close
255
     * @return mixed
256
     * @throws TransportException
257
     */
258 34
    public function execOne(CurlerRequest $request, $auto_close = false)
259
    {
260 34
        $h = $request->handle();
261 34
        curl_exec($h);
262
263 34
        $request->setResponse($this->makeResponse($h));
264
265 34
        if ($auto_close) {
266 2
            $request->close();
267
        }
268
269 34
        return $request->response()->http_code();
270
    }
271
272
    /**
273
     * @return string
274
     */
275
    public function getInfo()
276
    {
277
        return "runningRequests = {$this->runningRequests} , pending=" . sizeof($this->pendingRequests) . " ";
278
    }
279
280
    /**
281
     * @throws TransportException
282
     */
283 8
    public function exec()
284
    {
285 8
        $this->makePendingRequestsQue();
286
287
        // ensure we're running
288
        // a request was just completed -- find out which one
289
290 8
        while (($execrun = curl_multi_exec($this->handlerMulti(), $running)) == CURLM_CALL_MULTI_PERFORM);
291
292 8
        if ($execrun != CURLM_OK) {
293
            throw new TransportException("[ NOT CURLM_OK ]");
294
        }
295
296 8
        $this->runningRequests = $running;
297
298 8
        while ($done = curl_multi_info_read($this->handlerMulti())) {
299 8
            $response = $this->makeResponse($done['handle']);
300
301
            // send the return values to the callback function.
302
303 8
            $key = (string) $done['handle'];
304 8
            $task_id = $this->handleMapTasks[$key];
305 8
            $request = $this->pendingRequests[$this->handleMapTasks[$key]];
306
307 8
            unset($this->handleMapTasks[$key]);
308 8
            unset($this->activeRequests[$task_id]);
309
310 8
            $this->pendingRequests[$task_id]->setResponse($response);
311 8
            $this->pendingRequests[$task_id]->onCallback();
312
313
314 8
            if (!$request->isPersistent()) {
315 8
                unset($this->pendingRequests[$task_id]);
316
            }
317
318 8
            $this->completedRequestCount++;
319
320
            // remove the curl handle that just completed
321 8
            curl_multi_remove_handle($this->handlerMulti(), $done['handle']);
322
323
            // if something was requeued, this will get it running/update our loop check values
324 8
            $status = curl_multi_exec($this->handlerMulti(), $active);
0 ignored issues
show
Unused Code introduced by
The assignment to $status is dead and can be removed.
Loading history...
325
        }
326
327
        // see if there is anything to read
328 8
        curl_multi_select($this->handlerMulti(), 0.01);
329 8
        return $this->countActive();
330
    }
331
332 8
    public function makePendingRequestsQue()
333
    {
334
335 8
        $max = $this->getSimultaneousLimit();
336 8
        $active = $this->countActive();
337
338
339 8
        if ($active < $max) {
340
341 8
            $canAdd = $max - $active;
342
//            $pending = sizeof($this->pendingRequests);
343
344 8
            $add = [];
345
346
347 8
            foreach ($this->pendingRequests as $task_id => $params) {
348 8
                if (empty($this->activeRequests[$task_id])) {
349 8
                    $add[$task_id] = $task_id;
350
                }
351
            }
352
353
354 8
            if (sizeof($add)) {
355 8
                if ($canAdd >= sizeof($add)) {
356 8
                    $ll = $add;
357
                }
358
                else {
359
                    $ll = array_rand($add, $canAdd);
360
                    if (!is_array($ll)) {
361
                        $ll = array($ll => $ll);
362
                    }
363
                }
364
365 8
                foreach ($ll as $task_id) {
366 8
                    $this->_prepareLoopQue($task_id);
367
                }
368
369
            }// if add
370
        }// if can add
371 8
    }
372
373
    /**
374
     * @param string $task_id
375
     */
376 8
    private function _prepareLoopQue($task_id)
377
    {
378 8
        $this->activeRequests[$task_id] = 1;
379 8
        $this->waitRequests++;
380
381 8
        $h = $this->pendingRequests[$task_id]->handle();
382
383
        // pool
384 8
        curl_multi_add_handle($this->handlerMulti(), $h);
385
386 8
        $key = (string) $h;
387 8
        $this->handleMapTasks[$key] = $task_id;
388 8
    }
389
}
390