Passed
Branch master (8a55cf)
by Igor
12:47 queued 09:06
created

CurlerRolling   B

Complexity

Total Complexity 44

Size/Duplication

Total Lines 379
Duplicated Lines 0 %

Test Coverage

Coverage 79.53%

Importance

Changes 0
Metric Value
wmc 44
dl 0
loc 379
ccs 101
cts 127
cp 0.7953
rs 8.8798
c 0
b 0
f 0

18 Methods

Rating   Name   Duplication   Size   Complexity  
A execOne() 0 12 2
A getRunningRequests() 0 3 1
A close() 0 4 2
A execLoopWait() 0 20 3
B exec() 0 47 5
A handlerMulti() 0 11 3
A _prepareLoopQue() 0 12 1
B makePendingRequestsQue() 0 34 8
A makeResponse() 0 16 1
A countActive() 0 3 1
A __destructor() 0 3 1
A countPending() 0 3 1
A setSimultaneousLimit() 0 8 3
A countCompleted() 0 3 1
A getSimultaneousLimit() 0 3 1
A getInfo() 0 3 1
A addQueLoop() 0 18 5
A parse_headers_from_curl_response() 0 17 4

How to fix   Complexity   

Complex Class

Complex classes like CurlerRolling often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use CurlerRolling, and based on these observations, apply Extract Interface, too.

1
<?php
2
3
namespace ClickHouseDB\Transport;
4
5
use ClickHouseDB\Exception\TransportException;
6
7
class CurlerRolling
8
{
9
    /**
10
     * @var int
11
     *
12
     * Max number of simultaneous requests.
13
     */
14
    private $simultaneousLimit = 10;
15
16
    /**
17
     * @var array
18
     *
19
     * Requests currently being processed by curl
20
     */
21
    private $activeRequests = [];
22
23
    /**
24
     * @var int
25
     */
26
    private $runningRequests = 0;
27
28
    /**
29
     * @var CurlerRequest[]
30
     *
31
     * Requests queued to be processed
32
     */
33
    private $pendingRequests = [];
34
35
    /**
36
     * @return int
37
     */
38
    private $completedRequestCount = 0;
39
40
    /**
41
     * @var null
42
     */
43
    private $_pool_master = null;
44
45
    /**
46
     * @var int
47
     */
48
    private $waitRequests = 0;
49
50
    /**
51
     * @var array
52
     */
53
    private $handleMapTasks = [];
54
55
    /**
56
     *
57
     */
58
    public function __destructor()
59
    {
60
        $this->close();
61
    }
62
63
64
    /**
65
     * @return resource
66
     */
67 8
    private function handlerMulti()
68
    {
69 8
        if (!$this->_pool_master) {
70 8
            $this->_pool_master = curl_multi_init();
0 ignored issues
show
Documentation Bug introduced by
It seems like curl_multi_init() of type resource is incompatible with the declared type null of property $_pool_master.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
71
72 8
            if (function_exists('curl_multi_setopt')) {
73 8
                curl_multi_setopt($this->_pool_master, CURLMOPT_MAXCONNECTS, $this->simultaneousLimit);
74
            }
75
        }
76
77 8
        return $this->_pool_master;
78
    }
79
80
    /**
81
     *
82
     */
83
    public function close()
84
    {
85
        if ($this->_pool_master) {
86
            curl_multi_close($this->handlerMulti());
87
        }
88
    }
89
90
91
    /**
92
     * @param CurlerRequest $req
93
     * @param bool $checkMultiAdd
94
     * @param bool $force
95
     * @return bool
96
     * @throws TransportException
97
     */
98 11
    public function addQueLoop(CurlerRequest $req, $checkMultiAdd = true, $force = false)
99
    {
100 11
        $id = $req->getId();
101
102 11
        if (!$id) {
103 11
            $id = $req->getUniqHash($this->completedRequestCount);
104
        }
105
106 11
        if (!$force && isset($this->pendingRequests[$id])) {
107
            if (!$checkMultiAdd) {
108
                return false;
109
            }
110
111
            throw new TransportException("Cant add exists que - cant overwrite : $id!\n");
112
        }
113
114 11
        $this->pendingRequests[$id] = $req;
115 11
        return true;
116
    }
117
118
    /**
119
     * @param resource $oneHandle
120
     * @return CurlerResponse
121
     */
122 35
    private function makeResponse($oneHandle)
123
    {
124 35
        $response = curl_multi_getcontent($oneHandle);
125 35
        $header_size = curl_getinfo($oneHandle, CURLINFO_HEADER_SIZE);
126 35
        $header = substr($response, 0, $header_size);
127 35
        $body = substr($response, $header_size);
128
129 35
        $n = new CurlerResponse();
130 35
        $n->_headers = $this->parse_headers_from_curl_response($header);
0 ignored issues
show
Bug introduced by
$header of type string is incompatible with the type resource expected by parameter $response of ClickHouseDB\Transport\C...rs_from_curl_response(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

130
        $n->_headers = $this->parse_headers_from_curl_response(/** @scrutinizer ignore-type */ $header);
Loading history...
131 35
        $n->_body = $body;
132 35
        $n->_info = curl_getinfo($oneHandle);
133 35
        $n->_error = curl_error($oneHandle);
134 35
        $n->_errorNo = curl_errno($oneHandle);
135 35
        $n->_useTime = 0;
136
137 35
        return $n;
138
    }
139
140
    /**
141
     * @param int $usleep
142
     * @return bool
143
     * @throws TransportException
144
     */
145 8
    public function execLoopWait($usleep = 10000)
146
    {
147
        // @todo rewrite wait
148 8
        $c = 0;
149
150
        // add all tasks
151
        do {
152 8
            $this->exec();
153
154 8
            $loop = $this->countActive();
155 8
            $c++;
156
157 8
            if ($c > 100000) {
158
                break;
159
            }
160
161 8
            usleep($usleep);
162 8
        } while ($loop);
163
164 8
        return true;
165
    }
166
167
    /**
168
     * @param resource $response
169
     * @return array
170
     */
171 35
    private function parse_headers_from_curl_response($response)
172
    {
173 35
        $headers = [];
174 35
        $header_text = $response;
175
176 35
        foreach (explode("\r\n", $header_text) as $i => $line) {
0 ignored issues
show
Bug introduced by
$header_text of type resource is incompatible with the type string expected by parameter $string of explode(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

176
        foreach (explode("\r\n", /** @scrutinizer ignore-type */ $header_text) as $i => $line) {
Loading history...
177 35
            if ($i === 0) {
178 35
                $headers['http_code'] = $line;
179
            } else {
180 35
                $r = explode(': ', $line);
181 35
                if (sizeof($r) == 2) {
182 35
                    $headers[$r[0]] = $r[1];
183
                }
184
            }
185
        }
186
187 35
        return $headers;
188
    }
189
190
    /**
191
     * @return int
192
     */
193 9
    public function countPending()
194
    {
195 9
        return sizeof($this->pendingRequests);
196
    }
197
198
    /**
199
     * @return int
200
     */
201 8
    public function countActive()
202
    {
203 8
        return count($this->activeRequests);
204
    }
205
206
    /**
207
     * @return int
208
     */
209
    public function countCompleted()
210
    {
211
        return $this->completedRequestCount;
212
    }
213
214
    /**
215
     * Set the limit for how many cURL requests will be execute simultaneously.
216
     *
217
     * Please be mindful that if you set this too high, requests are likely to fail
218
     * more frequently or automated software may perceive you as a DOS attack and
219
     * automatically block further requests.
220
     *
221
     * @param int $count
222
     * @throws \InvalidArgumentException
223
     * @return $this
224
     */
225
    public function setSimultaneousLimit($count)
226
    {
227
        if (!is_int($count) || $count < 2) {
0 ignored issues
show
introduced by
The condition is_int($count) is always true.
Loading history...
228
            throw new \InvalidArgumentException("setSimultaneousLimit count must be an int >= 2");
229
        }
230
231
        $this->simultaneousLimit = $count;
232
        return $this;
233
    }
234
235
    /**
236
     * @return int
237
     */
238 8
    public function getSimultaneousLimit()
239
    {
240 8
        return $this->simultaneousLimit;
241
    }
242
243
    /**
244
     * @return int
245
     */
246
    public function getRunningRequests()
247
    {
248
        return $this->runningRequests;
249
    }
250
251
    /**
252
     * @param CurlerRequest $request
253
     * @param bool $auto_close
254
     * @return mixed
255
     * @throws TransportException
256
     */
257 35
    public function execOne(CurlerRequest $request, $auto_close = false)
258
    {
259 35
        $h = $request->handle();
260 35
        curl_exec($h);
261
262 35
        $request->setResponse($this->makeResponse($h));
263
264 35
        if ($auto_close) {
265 2
            $request->close();
266
        }
267
268 35
        return $request->response()->http_code();
269
    }
270
271
    /**
272
     * @return string
273
     */
274
    public function getInfo()
275
    {
276
        return "runningRequests = {$this->runningRequests} , pending=" . sizeof($this->pendingRequests) . " ";
277
    }
278
279
    /**
280
     * @throws TransportException
281
     */
282 8
    public function exec()
283
    {
284 8
        $this->makePendingRequestsQue();
285
286
        // ensure we're running
287
        // a request was just completed -- find out which one
288
289 8
        while (($execrun = curl_multi_exec($this->handlerMulti(), $running)) == CURLM_CALL_MULTI_PERFORM);
290
291 8
        if ($execrun != CURLM_OK) {
292
            throw new TransportException("[ NOT CURLM_OK ]");
293
        }
294
295 8
        $this->runningRequests = $running;
296
297 8
        while ($done = curl_multi_info_read($this->handlerMulti())) {
298 8
            $response = $this->makeResponse($done['handle']);
299
300
            // send the return values to the callback function.
301
302 8
            $key = (string) $done['handle'];
303 8
            $task_id = $this->handleMapTasks[$key];
304 8
            $request = $this->pendingRequests[$this->handleMapTasks[$key]];
305
306 8
            unset($this->handleMapTasks[$key]);
307 8
            unset($this->activeRequests[$task_id]);
308
309 8
            $this->pendingRequests[$task_id]->setResponse($response);
310 8
            $this->pendingRequests[$task_id]->onCallback();
311
312
313 8
            if (!$request->isPersistent()) {
314 8
                unset($this->pendingRequests[$task_id]);
315
            }
316
317 8
            $this->completedRequestCount++;
318
319
            // remove the curl handle that just completed
320 8
            curl_multi_remove_handle($this->handlerMulti(), $done['handle']);
321
322
            // if something was requeued, this will get it running/update our loop check values
323 8
            $status = curl_multi_exec($this->handlerMulti(), $active);
0 ignored issues
show
Unused Code introduced by
The assignment to $status is dead and can be removed.
Loading history...
324
        }
325
326
        // see if there is anything to read
327 8
        curl_multi_select($this->handlerMulti(), 0.01);
328 8
        return $this->countActive();
329
    }
330
331 8
    public function makePendingRequestsQue()
332
    {
333
334 8
        $max = $this->getSimultaneousLimit();
335 8
        $active = $this->countActive();
336
337
338 8
        if ($active < $max) {
339
340 8
            $canAdd = $max - $active;
341
//            $pending = sizeof($this->pendingRequests);
342
343 8
            $add = [];
344
345
346 8
            foreach ($this->pendingRequests as $task_id => $params) {
347 8
                if (empty($this->activeRequests[$task_id])) {
348 8
                    $add[$task_id] = $task_id;
349
                }
350
            }
351
352
353 8
            if (sizeof($add)) {
354 8
                if ($canAdd >= sizeof($add)) {
355 8
                    $ll = $add;
356
                } else {
357
                    $ll = array_rand($add, $canAdd);
358
                    if (!is_array($ll)) {
359
                        $ll = array($ll => $ll);
360
                    }
361
                }
362
363 8
                foreach ($ll as $task_id) {
364 8
                    $this->_prepareLoopQue($task_id);
365
                }
366
367
            }// if add
368
        }// if can add
369 8
    }
370
371
    /**
372
     * @param string $task_id
373
     */
374 8
    private function _prepareLoopQue($task_id)
375
    {
376 8
        $this->activeRequests[$task_id] = 1;
377 8
        $this->waitRequests++;
378
379 8
        $h = $this->pendingRequests[$task_id]->handle();
380
381
        // pool
382 8
        curl_multi_add_handle($this->handlerMulti(), $h);
383
384 8
        $key = (string) $h;
385 8
        $this->handleMapTasks[$key] = $task_id;
386 8
    }
387
}
388