Passed
Push — master ( cc4828...00e84f )
by Igor
10:05
created

src/Transport/CurlerRolling.php (1 issue)

1
<?php
2
3
namespace ClickHouseDB\Transport;
4
5
use ClickHouseDB\Exception\TransportException;
6
7
class CurlerRolling
8
{
9
    const SLEEP_DELAY = 1000; // 1ms
10
11
    /**
12
     * @var int
13
     *
14
     * Max number of simultaneous requests.
15
     */
16
    private $simultaneousLimit = 10;
17
18
    /**
19
     * @var array
20
     *
21
     * Requests currently being processed by curl
22
     */
23
    private $activeRequests = [];
24
25
    /**
26
     * @var int
27
     */
28
    private $runningRequests = 0;
29
30
    /**
31
     * @var CurlerRequest[]
32
     *
33
     * Requests queued to be processed
34
     */
35
    private $pendingRequests = [];
36
37
    /**
38
     * @return int
39
     */
40
    private $completedRequestCount = 0;
41
42
    /**
43
     * @var null|resource
44
     */
45
    private $_pool_master = null;
46
47
    /**
48
     * @var int
49
     */
50
    private $waitRequests = 0;
51
52
    /**
53
     * @var array
54
     */
55
    private $handleMapTasks = [];
56
57
    /**
58
     *
59
     */
60
    public function __destructor()
61
    {
62
        $this->close();
63
    }
64
65
66
    /**
67
     * @return resource
68
     */
69 10
    private function handlerMulti()
70
    {
71 10
        if (!$this->_pool_master) {
72 10
            $this->_pool_master = curl_multi_init();
73
74 10
            if (function_exists('curl_multi_setopt')) {
75 10
                curl_multi_setopt($this->_pool_master, CURLMOPT_MAXCONNECTS, $this->simultaneousLimit);
76
            }
77
        }
78
79 10
        return $this->_pool_master;
0 ignored issues
show
Bug Best Practice introduced by
The expression return $this->_pool_master also could return the type CurlMultiHandle|true which is incompatible with the documented return type resource.
Loading history...
80
    }
81
82
    /**
83
     *
84
     */
85
    public function close()
86
    {
87
        if ($this->_pool_master) {
88
            curl_multi_close($this->handlerMulti());
89
        }
90
    }
91
92
93
    /**
94
     * @param CurlerRequest $req
95
     * @param bool $checkMultiAdd
96
     * @param bool $force
97
     * @return bool
98
     * @throws TransportException
99
     */
100 15
    public function addQueLoop(CurlerRequest $req, $checkMultiAdd = true, $force = false)
101
    {
102 15
        $id = $req->getId();
103
104 15
        if (!$id) {
105 15
            $id = $req->getUniqHash($this->completedRequestCount);
106
        }
107
108 15
        if (!$force && isset($this->pendingRequests[$id])) {
109
            if (!$checkMultiAdd) {
110
                return false;
111
            }
112
113
            throw new TransportException("Cant add exists que - cant overwrite : $id!\n");
114
        }
115
116 15
        $this->pendingRequests[$id] = $req;
117 15
        return true;
118
    }
119
120
    /**
121
     * @param resource $oneHandle
122
     * @return CurlerResponse
123
     */
124 47
    private function makeResponse($oneHandle)
125
    {
126 47
        $response = curl_multi_getcontent($oneHandle);
127 47
        $header_size = curl_getinfo($oneHandle, CURLINFO_HEADER_SIZE);
128 47
        $header = substr($response, 0, $header_size);
129 47
        $body = substr($response, $header_size);
130
131 47
        $n = new CurlerResponse();
132 47
        $n->_headers = $this->parse_headers_from_curl_response($header);
133 47
        $n->_body = $body;
134 47
        $n->_info = curl_getinfo($oneHandle);
135 47
        $n->_error = curl_error($oneHandle);
136 47
        $n->_errorNo = curl_errno($oneHandle);
137 47
        $n->_useTime = 0;
138
139 47
        return $n;
140
    }
141
142
    /**
143
     * @return bool
144
     * @throws TransportException
145
     */
146 10
    public function execLoopWait()
147
    {
148
        do {
149 10
            $this->exec();
150 10
            usleep(self::SLEEP_DELAY);
151 10
        } while (($this->countActive() + $this->countPending()) > 0);
152
153 10
        return true;
154
    }
155
156
    /**
157
     * @param string $response
158
     * @return array
159
     */
160 47
    private function parse_headers_from_curl_response($response)
161
    {
162 47
        $headers = [];
163 47
        $header_text = $response;
164
165 47
        foreach (explode("\r\n", $header_text) as $i => $line) {
166 47
            if ($i === 0) {
167 47
                $headers['http_code'] = $line;
168
            } else {
169 46
                $r = explode(': ', $line);
170 46
                if (sizeof($r) == 2) {
171 47
                    $headers[$r[0]] = $r[1];
172
                }
173
            }
174
        }
175
176 47
        return $headers;
177
    }
178
179
    /**
180
     * @return int
181
     */
182 14
    public function countPending()
183
    {
184 14
        return sizeof($this->pendingRequests);
185
    }
186
187
    /**
188
     * @return int
189
     */
190 10
    public function countActive()
191
    {
192 10
        return count($this->activeRequests);
193
    }
194
195
    /**
196
     * @return int
197
     */
198
    public function countCompleted()
199
    {
200
        return $this->completedRequestCount;
201
    }
202
203
    /**
204
     * Set the limit for how many cURL requests will be execute simultaneously.
205
     *
206
     * Please be mindful that if you set this too high, requests are likely to fail
207
     * more frequently or automated software may perceive you as a DOS attack and
208
     * automatically block further requests.
209
     *
210
     * @param int $count
211
     * @throws \InvalidArgumentException
212
     * @return $this
213
     */
214
    public function setSimultaneousLimit($count)
215
    {
216
        if (!is_int($count) || $count < 2) {
217
            throw new \InvalidArgumentException("setSimultaneousLimit count must be an int >= 2");
218
        }
219
220
        $this->simultaneousLimit = $count;
221
        return $this;
222
    }
223
224
    /**
225
     * @return int
226
     */
227 10
    public function getSimultaneousLimit()
228
    {
229 10
        return $this->simultaneousLimit;
230
    }
231
232
    /**
233
     * @return int
234
     */
235
    public function getRunningRequests()
236
    {
237
        return $this->runningRequests;
238
    }
239
240
    /**
241
     * @param CurlerRequest $request
242
     * @param bool $auto_close
243
     * @return mixed
244
     * @throws TransportException
245
     */
246 47
    public function execOne(CurlerRequest $request, $auto_close = false)
247
    {
248 47
        $h = $request->handle();
249 47
        curl_exec($h);
250
251 47
        $request->setResponse($this->makeResponse($h));
252
253 47
        if ($auto_close) {
254 4
            $request->close();
255
        }
256
257 47
        return $request->response()->http_code();
258
    }
259
260
    /**
261
     * @return string
262
     */
263
    public function getInfo()
264
    {
265
        return "runningRequests = {$this->runningRequests} , pending=" . sizeof($this->pendingRequests) . " ";
266
    }
267
268
    /**
269
     * @throws TransportException
270
     */
271 10
    public function exec()
272
    {
273 10
        $this->makePendingRequestsQue();
274
275
        // ensure we're running
276
        // a request was just completed -- find out which one
277
278 10
        while (($execrun = curl_multi_exec($this->handlerMulti(), $running)) == CURLM_CALL_MULTI_PERFORM);
279
280 10
        if ($execrun != CURLM_OK) {
281
            throw new TransportException("[ NOT CURLM_OK ]");
282
        }
283
284 10
        $this->runningRequests = $running;
285
286 10
        while ($done = curl_multi_info_read($this->handlerMulti())) {
287 10
            $response = $this->makeResponse($done['handle']);
288
289
            // send the return values to the callback function.
290
291 10
            $key = (string) $done['handle'];
292 10
            $task_id = $this->handleMapTasks[$key];
293 10
            $request = $this->pendingRequests[$this->handleMapTasks[$key]];
294
295 10
            unset($this->handleMapTasks[$key]);
296 10
            unset($this->activeRequests[$task_id]);
297
298 10
            $this->pendingRequests[$task_id]->setResponse($response);
299 10
            $this->pendingRequests[$task_id]->onCallback();
300
301
302 10
            if (!$request->isPersistent()) {
303 10
                unset($this->pendingRequests[$task_id]);
304
            }
305
306 10
            $this->completedRequestCount++;
307
308
            // remove the curl handle that just completed
309 10
            curl_multi_remove_handle($this->handlerMulti(), $done['handle']);
310
311
            // if something was requeued, this will get it running/update our loop check values
312 10
            $status = curl_multi_exec($this->handlerMulti(), $active);
313
        }
314
315
        // see if there is anything to read
316 10
        curl_multi_select($this->handlerMulti(), 0.01);
317 10
        return $this->countActive();
318
    }
319
320 10
    public function makePendingRequestsQue()
321
    {
322 10
        $max = $this->getSimultaneousLimit();
323 10
        $active = $this->countActive();
324
325
326 10
        if ($active < $max) {
327 10
            $canAdd = $max - $active;
328
//            $pending = sizeof($this->pendingRequests);
329
330 10
            $add = [];
331
332
333 10
            foreach ($this->pendingRequests as $task_id => $params) {
334 10
                if (empty($this->activeRequests[$task_id])) {
335 10
                    $add[$task_id] = $task_id;
336
                }
337
            }
338
339
340 10
            if (sizeof($add)) {
341 10
                if ($canAdd >= sizeof($add)) {
342 10
                    $ll = $add;
343
                } else {
344 1
                    $ll = array_rand($add, $canAdd);
345 1
                    if (!is_array($ll)) {
346 1
                        $ll = array($ll => $ll);
347
                    }
348
                }
349
350 10
                foreach ($ll as $task_id) {
351 10
                    $this->_prepareLoopQue($task_id);
352
                }
353
            }// if add
354
        }// if can add
355 10
    }
356
357
    /**
358
     * @param string $task_id
359
     */
360 10
    private function _prepareLoopQue($task_id)
361
    {
362 10
        $this->activeRequests[$task_id] = 1;
363 10
        $this->waitRequests++;
364
365 10
        $h = $this->pendingRequests[$task_id]->handle();
366
367
        // pool
368 10
        curl_multi_add_handle($this->handlerMulti(), $h);
369
370 10
        $key = (string) $h;
371 10
        $this->handleMapTasks[$key] = $task_id;
372 10
    }
373
}
374