Passed
Push — master ( 27740f...a27d0b )
by Igor
02:55
created

CurlerRolling::handlerMulti()   A

Complexity

Conditions 3
Paths 3

Size

Total Lines 11
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 3

Importance

Changes 0
Metric Value
eloc 5
dl 0
loc 11
c 0
b 0
f 0
ccs 6
cts 6
cp 1
rs 10
cc 3
nc 3
nop 0
crap 3
1
<?php
2
3
namespace ClickHouseDB\Transport;
4
5
use ClickHouseDB\Exception\TransportException;
6
7
class CurlerRolling
8
{
9
    /**
10
     * @var int
11
     *
12
     * Max number of simultaneous requests.
13
     */
14
    private $simultaneousLimit = 10;
15
16
    /**
17
     * @var array
18
     *
19
     * Requests currently being processed by curl
20
     */
21
    private $activeRequests = [];
22
23
    /**
24
     * @var int
25
     */
26
    private $runningRequests = 0;
27
28
    /**
29
     * @var CurlerRequest[]
30
     *
31
     * Requests queued to be processed
32
     */
33
    private $pendingRequests = [];
34
35
    /**
36
     * @return int
37
     */
38
    private $completedRequestCount = 0;
39
40
    /**
41
     * @var null|resource
42
     */
43
    private $_pool_master = null;
44
45
    /**
46
     * @var int
47
     */
48
    private $waitRequests = 0;
49
50
    /**
51
     * @var array
52
     */
53
    private $handleMapTasks = [];
54
55
    /**
56
     *
57
     */
58
    public function __destructor()
59
    {
60
        $this->close();
61
    }
62
63
64
    /**
65
     * @return resource
66
     */
67 9
    private function handlerMulti()
68
    {
69 9
        if (!$this->_pool_master) {
70 9
            $this->_pool_master = curl_multi_init();
71
72 9
            if (function_exists('curl_multi_setopt')) {
73 9
                curl_multi_setopt($this->_pool_master, CURLMOPT_MAXCONNECTS, $this->simultaneousLimit);
74
            }
75
        }
76
77 9
        return $this->_pool_master;
78
    }
79
80
    /**
81
     *
82
     */
83
    public function close()
84
    {
85
        if ($this->_pool_master) {
86
            curl_multi_close($this->handlerMulti());
87
        }
88
    }
89
90
91
    /**
92
     * @param CurlerRequest $req
93
     * @param bool $checkMultiAdd
94
     * @param bool $force
95
     * @return bool
96
     * @throws TransportException
97
     */
98 12
    public function addQueLoop(CurlerRequest $req, $checkMultiAdd = true, $force = false)
99
    {
100 12
        $id = $req->getId();
101
102 12
        if (!$id) {
103 12
            $id = $req->getUniqHash($this->completedRequestCount);
104
        }
105
106 12
        if (!$force && isset($this->pendingRequests[$id])) {
107
            if (!$checkMultiAdd) {
108
                return false;
109
            }
110
111
            throw new TransportException("Cant add exists que - cant overwrite : $id!\n");
112
        }
113
114 12
        $this->pendingRequests[$id] = $req;
115 12
        return true;
116
    }
117
118
    /**
119
     * @param resource $oneHandle
120
     * @return CurlerResponse
121
     */
122 41
    private function makeResponse($oneHandle)
123
    {
124 41
        $response = curl_multi_getcontent($oneHandle);
125 41
        $header_size = curl_getinfo($oneHandle, CURLINFO_HEADER_SIZE);
126 41
        $header = substr($response, 0, $header_size);
127 41
        $body = substr($response, $header_size);
128
129 41
        $n = new CurlerResponse();
130 41
        $n->_headers = $this->parse_headers_from_curl_response($header);
131 41
        $n->_body = $body;
132 41
        $n->_info = curl_getinfo($oneHandle);
133 41
        $n->_error = curl_error($oneHandle);
134 41
        $n->_errorNo = curl_errno($oneHandle);
135 41
        $n->_useTime = 0;
136
137 41
        return $n;
138
    }
139
140
    /**
141
     * @return bool
142
     * @throws TransportException
143
     */
144 9
    public function execLoopWait()
145
    {
146 9
        $c = 0;
147
        // add all tasks
148
        do {
149 9
            $this->exec();
150
151 9
            $loop = $this->countActive();
0 ignored issues
show
Unused Code introduced by
The assignment to $loop is dead and can be removed.
Loading history...
152 9
            $pend = $this->countPending();
153 9
            $c++;
154
155 9
            if ($c > 20000) {
156
                break;
157
            }
158 9
            usleep(500);
159 9
        } while ($pend);
160
161 9
        return true;
162
    }
163
164
    /**
165
     * @param string $response
166
     * @return array
167
     */
168 41
    private function parse_headers_from_curl_response($response)
169
    {
170 41
        $headers = [];
171 41
        $header_text = $response;
172
173 41
        foreach (explode("\r\n", $header_text) as $i => $line) {
174 41
            if ($i === 0) {
175 41
                $headers['http_code'] = $line;
176
            } else {
177 40
                $r = explode(': ', $line);
178 40
                if (sizeof($r) == 2) {
179 41
                    $headers[$r[0]] = $r[1];
180
                }
181
            }
182
        }
183
184 41
        return $headers;
185
    }
186
187
    /**
188
     * @return int
189
     */
190 13
    public function countPending()
191
    {
192 13
        return sizeof($this->pendingRequests);
193
    }
194
195
    /**
196
     * @return int
197
     */
198 9
    public function countActive()
199
    {
200 9
        return count($this->activeRequests);
201
    }
202
203
    /**
204
     * @return int
205
     */
206
    public function countCompleted()
207
    {
208
        return $this->completedRequestCount;
209
    }
210
211
    /**
212
     * Set the limit for how many cURL requests will be execute simultaneously.
213
     *
214
     * Please be mindful that if you set this too high, requests are likely to fail
215
     * more frequently or automated software may perceive you as a DOS attack and
216
     * automatically block further requests.
217
     *
218
     * @param int $count
219
     * @throws \InvalidArgumentException
220
     * @return $this
221
     */
222
    public function setSimultaneousLimit($count)
223
    {
224
        if (!is_int($count) || $count < 2) {
0 ignored issues
show
introduced by
The condition is_int($count) is always true.
Loading history...
225
            throw new \InvalidArgumentException("setSimultaneousLimit count must be an int >= 2");
226
        }
227
228
        $this->simultaneousLimit = $count;
229
        return $this;
230
    }
231
232
    /**
233
     * @return int
234
     */
235 9
    public function getSimultaneousLimit()
236
    {
237 9
        return $this->simultaneousLimit;
238
    }
239
240
    /**
241
     * @return int
242
     */
243
    public function getRunningRequests()
244
    {
245
        return $this->runningRequests;
246
    }
247
248
    /**
249
     * @param CurlerRequest $request
250
     * @param bool $auto_close
251
     * @return mixed
252
     * @throws TransportException
253
     */
254 40
    public function execOne(CurlerRequest $request, $auto_close = false)
255
    {
256 40
        $h = $request->handle();
257 40
        curl_exec($h);
258
259 40
        $request->setResponse($this->makeResponse($h));
260
261 40
        if ($auto_close) {
262 4
            $request->close();
263
        }
264
265 40
        return $request->response()->http_code();
266
    }
267
268
    /**
269
     * @return string
270
     */
271
    public function getInfo()
272
    {
273
        return "runningRequests = {$this->runningRequests} , pending=" . sizeof($this->pendingRequests) . " ";
274
    }
275
276
    /**
277
     * @throws TransportException
278
     */
279 9
    public function exec()
280
    {
281 9
        $this->makePendingRequestsQue();
282
283
        // ensure we're running
284
        // a request was just completed -- find out which one
285
286 9
        while (($execrun = curl_multi_exec($this->handlerMulti(), $running)) == CURLM_CALL_MULTI_PERFORM);
287
288 9
        if ($execrun != CURLM_OK) {
289
            throw new TransportException("[ NOT CURLM_OK ]");
290
        }
291
292 9
        $this->runningRequests = $running;
293
294 9
        while ($done = curl_multi_info_read($this->handlerMulti())) {
295 9
            $response = $this->makeResponse($done['handle']);
296
297
            // send the return values to the callback function.
298
299 9
            $key = (string) $done['handle'];
300 9
            $task_id = $this->handleMapTasks[$key];
301 9
            $request = $this->pendingRequests[$this->handleMapTasks[$key]];
302
303 9
            unset($this->handleMapTasks[$key]);
304 9
            unset($this->activeRequests[$task_id]);
305
306 9
            $this->pendingRequests[$task_id]->setResponse($response);
307 9
            $this->pendingRequests[$task_id]->onCallback();
308
309
310 9
            if (!$request->isPersistent()) {
311 9
                unset($this->pendingRequests[$task_id]);
312
            }
313
314 9
            $this->completedRequestCount++;
315
316
            // remove the curl handle that just completed
317 9
            curl_multi_remove_handle($this->handlerMulti(), $done['handle']);
318
319
            // if something was requeued, this will get it running/update our loop check values
320 9
            $status = curl_multi_exec($this->handlerMulti(), $active);
0 ignored issues
show
Unused Code introduced by
The assignment to $status is dead and can be removed.
Loading history...
321
        }
322
323
        // see if there is anything to read
324 9
        curl_multi_select($this->handlerMulti(), 0.01);
325 9
        return $this->countActive();
326
    }
327
328 9
    public function makePendingRequestsQue()
329
    {
330
331 9
        $max = $this->getSimultaneousLimit();
332 9
        $active = $this->countActive();
333
334
335 9
        if ($active < $max) {
336
337 9
            $canAdd = $max - $active;
338
//            $pending = sizeof($this->pendingRequests);
339
340 9
            $add = [];
341
342
343 9
            foreach ($this->pendingRequests as $task_id => $params) {
344 9
                if (empty($this->activeRequests[$task_id])) {
345 9
                    $add[$task_id] = $task_id;
346
                }
347
            }
348
349
350 9
            if (sizeof($add)) {
351 9
                if ($canAdd >= sizeof($add)) {
352 9
                    $ll = $add;
353
                } else {
354 1
                    $ll = array_rand($add, $canAdd);
355 1
                    if (!is_array($ll)) {
356 1
                        $ll = array($ll => $ll);
357
                    }
358
                }
359
360 9
                foreach ($ll as $task_id) {
361 9
                    $this->_prepareLoopQue($task_id);
362
                }
363
364
            }// if add
365
        }// if can add
366 9
    }
367
368
    /**
369
     * @param string $task_id
370
     */
371 9
    private function _prepareLoopQue($task_id)
372
    {
373 9
        $this->activeRequests[$task_id] = 1;
374 9
        $this->waitRequests++;
375
376 9
        $h = $this->pendingRequests[$task_id]->handle();
377
378
        // pool
379 9
        curl_multi_add_handle($this->handlerMulti(), $h);
380
381 9
        $key = (string) $h;
382 9
        $this->handleMapTasks[$key] = $task_id;
383 9
    }
384
}
385