Passed
Push — master ( 7b4f81...84e0ca )
by Igor
03:22
created

CurlerRolling::execLoopWait()   A

Complexity

Conditions 3
Paths 2

Size

Total Lines 18

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 10
CRAP Score 3.0067

Importance

Changes 0
Metric Value
dl 0
loc 18
ccs 10
cts 11
cp 0.9091
rs 9.6666
c 0
b 0
f 0
cc 3
nc 2
nop 0
crap 3.0067
1
<?php
2
3
namespace ClickHouseDB\Transport;
4
5
use ClickHouseDB\Exception\TransportException;
6
7
class CurlerRolling
8
{
9
    /**
10
     * @var int
11
     *
12
     * Max number of simultaneous requests.
13
     */
14
    private $simultaneousLimit = 10;
15
16
    /**
17
     * @var array
18
     *
19
     * Requests currently being processed by curl
20
     */
21
    private $activeRequests = [];
22
23
    /**
24
     * @var int
25
     */
26
    private $runningRequests = 0;
27
28
    /**
29
     * @var CurlerRequest[]
30
     *
31
     * Requests queued to be processed
32
     */
33
    private $pendingRequests = [];
34
35
    /**
36
     * @return int
37
     */
38
    private $completedRequestCount = 0;
39
40
    /**
41
     * @var null|resource
42
     */
43
    private $_pool_master = null;
44
45
    /**
46
     * @var int
47
     */
48
    private $waitRequests = 0;
49
50
    /**
51
     * @var array
52
     */
53
    private $handleMapTasks = [];
54
55
    /**
56
     *
57
     */
58
    public function __destructor()
59
    {
60
        $this->close();
61
    }
62
63
64
    /**
65
     * @return resource
66
     */
67 8
    private function handlerMulti()
68
    {
69 8
        if (!$this->_pool_master) {
70 8
            $this->_pool_master = curl_multi_init();
71
72 8
            if (function_exists('curl_multi_setopt')) {
73 8
                curl_multi_setopt($this->_pool_master, CURLMOPT_MAXCONNECTS, $this->simultaneousLimit);
74
            }
75
        }
76
77 8
        return $this->_pool_master;
78
    }
79
80
    /**
81
     *
82
     */
83
    public function close()
84
    {
85
        if ($this->_pool_master) {
86
            curl_multi_close($this->handlerMulti());
87
        }
88
    }
89
90
91
    /**
92
     * @param CurlerRequest $req
93
     * @param bool $checkMultiAdd
94
     * @param bool $force
95
     * @return bool
96
     * @throws TransportException
97
     */
98 11
    public function addQueLoop(CurlerRequest $req, $checkMultiAdd = true, $force = false)
99
    {
100 11
        $id = $req->getId();
101
102 11
        if (!$id) {
103 11
            $id = $req->getUniqHash($this->completedRequestCount);
104
        }
105
106 11
        if (!$force && isset($this->pendingRequests[$id])) {
107
            if (!$checkMultiAdd) {
108
                return false;
109
            }
110
111
            throw new TransportException("Cant add exists que - cant overwrite : $id!\n");
112
        }
113
114 11
        $this->pendingRequests[$id] = $req;
115 11
        return true;
116
    }
117
118
    /**
119
     * @param resource $oneHandle
120
     * @return CurlerResponse
121
     */
122 40
    private function makeResponse($oneHandle)
123
    {
124 40
        $response = curl_multi_getcontent($oneHandle);
125 40
        $header_size = curl_getinfo($oneHandle, CURLINFO_HEADER_SIZE);
126 40
        $header = substr($response, 0, $header_size);
127 40
        $body = substr($response, $header_size);
128
129 40
        $n = new CurlerResponse();
130 40
        $n->_headers = $this->parse_headers_from_curl_response($header);
131 40
        $n->_body = $body;
132 40
        $n->_info = curl_getinfo($oneHandle);
133 40
        $n->_error = curl_error($oneHandle);
134 40
        $n->_errorNo = curl_errno($oneHandle);
135 40
        $n->_useTime = 0;
136
137 40
        return $n;
138
    }
139
140
    /**
141
     * @return bool
142
     * @throws TransportException
143
     */
144 8
    public function execLoopWait()
145
    {
146 8
        $c = 0;
147 8
        $usleep = 5000;
148
        // add all tasks
149
        do {
150 8
            $this->exec();
151
152 8
            $loop = $this->countActive();
153 8
            $c++;
154
155 8
            if ($c > 200000) {
156
                break;
157
            }
158 8
            usleep($usleep);
159 8
        } while ($loop);
160
161 8
        return true;
162
    }
163
164
    /**
165
     * @param string $response
166
     * @return array
167
     */
168 40
    private function parse_headers_from_curl_response($response)
169
    {
170 40
        $headers = [];
171 40
        $header_text = $response;
172
173 40
        foreach (explode("\r\n", $header_text) as $i => $line) {
174 40
            if ($i === 0) {
175 40
                $headers['http_code'] = $line;
176
            } else {
177 39
                $r = explode(': ', $line);
178 39
                if (sizeof($r) == 2) {
179 40
                    $headers[$r[0]] = $r[1];
180
                }
181
            }
182
        }
183
184 40
        return $headers;
185
    }
186
187
    /**
188
     * @return int
189
     */
190 11
    public function countPending()
191
    {
192 11
        return sizeof($this->pendingRequests);
193
    }
194
195
    /**
196
     * @return int
197
     */
198 8
    public function countActive()
199
    {
200 8
        return count($this->activeRequests);
201
    }
202
203
    /**
204
     * @return int
205
     */
206
    public function countCompleted()
207
    {
208
        return $this->completedRequestCount;
209
    }
210
211
    /**
212
     * Set the limit for how many cURL requests will be execute simultaneously.
213
     *
214
     * Please be mindful that if you set this too high, requests are likely to fail
215
     * more frequently or automated software may perceive you as a DOS attack and
216
     * automatically block further requests.
217
     *
218
     * @param int $count
219
     * @throws \InvalidArgumentException
220
     * @return $this
221
     */
222
    public function setSimultaneousLimit($count)
223
    {
224
        if (!is_int($count) || $count < 2) {
0 ignored issues
show
introduced by
The condition is_int($count) is always true.
Loading history...
225
            throw new \InvalidArgumentException("setSimultaneousLimit count must be an int >= 2");
226
        }
227
228
        $this->simultaneousLimit = $count;
229
        return $this;
230
    }
231
232
    /**
233
     * @return int
234
     */
235 8
    public function getSimultaneousLimit()
236
    {
237 8
        return $this->simultaneousLimit;
238
    }
239
240
    /**
241
     * @return int
242
     */
243
    public function getRunningRequests()
244
    {
245
        return $this->runningRequests;
246
    }
247
248
    /**
249
     * @param CurlerRequest $request
250
     * @param bool $auto_close
251
     * @return mixed
252
     * @throws TransportException
253
     */
254 40
    public function execOne(CurlerRequest $request, $auto_close = false)
255
    {
256 40
        $h = $request->handle();
257 40
        curl_exec($h);
258
259 40
        $request->setResponse($this->makeResponse($h));
260
261 40
        if ($auto_close) {
262 4
            $request->close();
263
        }
264
265 40
        return $request->response()->http_code();
266
    }
267
268
    /**
269
     * @return string
270
     */
271
    public function getInfo()
272
    {
273
        return "runningRequests = {$this->runningRequests} , pending=" . sizeof($this->pendingRequests) . " ";
274
    }
275
276
    /**
277
     * @throws TransportException
278
     */
279 8
    public function exec()
280
    {
281 8
        $this->makePendingRequestsQue();
282
283
        // ensure we're running
284
        // a request was just completed -- find out which one
285
286 8
        while (($execrun = curl_multi_exec($this->handlerMulti(), $running)) == CURLM_CALL_MULTI_PERFORM);
287
288 8
        if ($execrun != CURLM_OK) {
289
            throw new TransportException("[ NOT CURLM_OK ]");
290
        }
291
292 8
        $this->runningRequests = $running;
293
294 8
        while ($done = curl_multi_info_read($this->handlerMulti())) {
295 8
            $response = $this->makeResponse($done['handle']);
296
297
            // send the return values to the callback function.
298
299 8
            $key = (string) $done['handle'];
300 8
            $task_id = $this->handleMapTasks[$key];
301 8
            $request = $this->pendingRequests[$this->handleMapTasks[$key]];
302
303 8
            unset($this->handleMapTasks[$key]);
304 8
            unset($this->activeRequests[$task_id]);
305
306 8
            $this->pendingRequests[$task_id]->setResponse($response);
307 8
            $this->pendingRequests[$task_id]->onCallback();
308
309
310 8
            if (!$request->isPersistent()) {
311 8
                unset($this->pendingRequests[$task_id]);
312
            }
313
314 8
            $this->completedRequestCount++;
315
316
            // remove the curl handle that just completed
317 8
            curl_multi_remove_handle($this->handlerMulti(), $done['handle']);
318
319
            // if something was requeued, this will get it running/update our loop check values
320 8
            $status = curl_multi_exec($this->handlerMulti(), $active);
0 ignored issues
show
Unused Code introduced by
The assignment to $status is dead and can be removed.
Loading history...
321
        }
322
323
        // see if there is anything to read
324 8
        curl_multi_select($this->handlerMulti(), 0.01);
325 8
        return $this->countActive();
326
    }
327
328 8
    public function makePendingRequestsQue()
329
    {
330
331 8
        $max = $this->getSimultaneousLimit();
332 8
        $active = $this->countActive();
333
334
335 8
        if ($active < $max) {
336
337 8
            $canAdd = $max - $active;
338
//            $pending = sizeof($this->pendingRequests);
339
340 8
            $add = [];
341
342
343 8
            foreach ($this->pendingRequests as $task_id => $params) {
344 8
                if (empty($this->activeRequests[$task_id])) {
345 8
                    $add[$task_id] = $task_id;
346
                }
347
            }
348
349
350 8
            if (sizeof($add)) {
351 8
                if ($canAdd >= sizeof($add)) {
352 8
                    $ll = $add;
353
                } else {
354
                    $ll = array_rand($add, $canAdd);
355
                    if (!is_array($ll)) {
356
                        $ll = array($ll => $ll);
357
                    }
358
                }
359
360 8
                foreach ($ll as $task_id) {
361 8
                    $this->_prepareLoopQue($task_id);
362
                }
363
364
            }// if add
365
        }// if can add
366 8
    }
367
368
    /**
369
     * @param string $task_id
370
     */
371 8
    private function _prepareLoopQue($task_id)
372
    {
373 8
        $this->activeRequests[$task_id] = 1;
374 8
        $this->waitRequests++;
375
376 8
        $h = $this->pendingRequests[$task_id]->handle();
377
378
        // pool
379 8
        curl_multi_add_handle($this->handlerMulti(), $h);
380
381 8
        $key = (string) $h;
382 8
        $this->handleMapTasks[$key] = $task_id;
383 8
    }
384
}
385