Passed
Push — master ( 82e8a0...141c1f )
by Igor
02:31
created

Http::getRequestRead()   C

Complexity

Conditions 12
Paths 64

Size

Total Lines 59

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 22
CRAP Score 12.0832

Importance

Changes 0
Metric Value
dl 0
loc 59
ccs 22
cts 24
cp 0.9167
rs 6.4678
c 0
b 0
f 0
cc 12
nc 64
nop 3
crap 12.0832

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
namespace ClickHouseDB\Transport;
4
5
use ClickHouseDB\Query\Degeneration;
6
use ClickHouseDB\Query\Query;
7
use ClickHouseDB\Query\WhereInFile;
8
use ClickHouseDB\Query\WriteToFile;
9
use ClickHouseDB\Settings;
10
use ClickHouseDB\Statement;
11
12
class Http
13
{
14
    /**
15
     * @var string
16
     */
17
    private $_username = null;
18
19
    /**
20
     * @var string
21
     */
22
    private $_password = null;
23
24
    /**
25
     * @var string
26
     */
27
    private $_host = '';
28
29
    /**
30
     * @var int
31
     */
32
    private $_port = 0;
33
34
    /**
35
     * @var bool|int
36
     */
37
    private $_verbose = false;
38
39
    /**
40
     * @var CurlerRolling
41
     */
42
    private $_curler = null;
43
44
    /**
45
     * @var Settings
46
     */
47
    private $_settings = null;
48
49
    /**
50
     * @var array
51
     */
52
    private $_query_degenerations = [];
53
54
    /**
55
     * Count seconds (int)
56
     *
57
     * @var int
58
     */
59
    private $_connectTimeOut = 5;
60
61
    /**
62
     * @var callable
63
     */
64
    private $xClickHouseProgress = null;
65
66
    /**
67
     * Http constructor.
68
     * @param string $host
69
     * @param int $port
70
     * @param string $username
71
     * @param string $password
72
     */
73 44
    public function __construct($host, $port, $username, $password)
74
    {
75 44
        $this->setHost($host, $port);
76
77 44
        $this->_username = $username;
78 44
        $this->_password = $password;
79 44
        $this->_settings = new Settings($this);
80
81 44
        $this->setCurler();
82 44
    }
83
84
85 44
    public function setCurler()
86
    {
87 44
        $this->_curler = new CurlerRolling();
88 44
    }
89
90
    /**
91
     * @return CurlerRolling
92
     */
93
    public function getCurler()
94
    {
95
        return $this->_curler;
96
    }
97
98
    /**
99
     * @param string $host
100
     * @param int $port
101
     */
102 44
    public function setHost($host, $port = -1)
103
    {
104 44
        if ($port > 0) {
105 44
            $this->_port = $port;
106
        }
107
108 44
        $this->_host = $host;
109 44
    }
110
111
    /**
112
     * @return string
113
     */
114 36
    public function getUri()
115
    {
116 36
        $proto = 'http';
117 36
        if ($this->settings()->isHttps()) {
118
            $proto = 'https';
119
        }
120
121 36
        return $proto . '://' . $this->_host . ':' . $this->_port;
122
    }
123
124
    /**
125
     * @return Settings
126
     */
127 44
    public function settings()
128
    {
129 44
        return $this->_settings;
130
    }
131
132
    /**
133
     * @param bool|int $flag
134
     * @return mixed
135
     */
136
    public function verbose($flag)
137
    {
138
        $this->_verbose = $flag;
139
        return $flag;
140
    }
141
142
    /**
143
     * @param array $params
144
     * @return string
145
     */
146 36
    private function getUrl($params = [])
147
    {
148 36
        $settings = $this->settings()->getSettings();
149
150 36
        if (is_array($params) && sizeof($params)) {
151 36
            $settings = array_merge($settings, $params);
152
        }
153
154
155 36
        if ($this->settings()->isReadOnlyUser())
156
        {
157
            unset($settings['extremes']);
158
            unset($settings['readonly']);
159
            unset($settings['enable_http_compression']);
160
            unset($settings['max_execution_time']);
161
162
        }
163
164 36
        unset($settings['https']);
165
166
167 36
        return $this->getUri() . '?' . http_build_query($settings);
168
    }
169
170
    /**
171
     * @param array $extendinfo
172
     * @return CurlerRequest
173
     */
174 36
    private function newRequest($extendinfo)
175
    {
176 36
        $new = new CurlerRequest();
177 36
        $new->auth($this->_username, $this->_password)
178 36
            ->POST()
179 36
            ->setRequestExtendedInfo($extendinfo);
180
181 36
        if ($this->settings()->isEnableHttpCompression()) {
182 27
            $new->httpCompression(true);
183
        }
184 36
        if ($this->settings()->getSessionId())
185
        {
186 1
            $new->persistent();
187
        }
188
189 36
        $new->timeOut($this->settings()->getTimeOut());
190 36
        $new->connectTimeOut($this->_connectTimeOut)->keepAlive(); // one sec
191 36
        $new->verbose(boolval($this->_verbose));
192
193 36
        return $new;
194
    }
195
196
    /**
197
     * @param Query $query
198
     * @param array $urlParams
199
     * @param bool $query_as_string
200
     * @return CurlerRequest
201
     * @throws \ClickHouseDB\Exception\TransportException
202
     */
203 36
    private function makeRequest(Query $query, $urlParams = [], $query_as_string = false)
204
    {
205 36
        $sql = $query->toSql();
206
207 36
        if ($query_as_string) {
208 1
            $urlParams['query'] = $sql;
209
        }
210
211 36
        $url = $this->getUrl($urlParams);
212
213
        $extendinfo = [
214 36
            'sql' => $sql,
215 36
            'query' => $query,
216 36
            'format'=> $query->getFormat()
217
        ];
218
219 36
        $new = $this->newRequest($extendinfo);
220 36
        $new->url($url);
221
222
223
224
225 36
        if (!$query_as_string) {
226 36
            $new->parameters_json($sql);
227
        }
228 36
        if ($this->settings()->isEnableHttpCompression()) {
229 27
            $new->httpCompression(true);
230
        }
231
232 36
        return $new;
233
    }
234
235
    /**
236
     * @param string $sql
237
     * @return CurlerRequest
238
     */
239 2
    public function writeStreamData($sql)
240
    {
241 2
        $query = new Query($sql);
242
243 2
        $url = $this->getUrl([
244 2
            'readonly' => 0,
245 2
            'query' => $query->toSql()
246
        ]);
247
248
        $extendinfo = [
249 2
            'sql' => $sql,
250 2
            'query' => $query,
251 2
            'format'=> $query->getFormat()
252
        ];
253
254 2
        $request = $this->newRequest($extendinfo);
255 2
        $request->url($url);
256 2
        return $request;
257
    }
258
259
260
    /**
261
     * @param string $sql
262
     * @param string $file_name
263
     * @return Statement
264
     * @throws \ClickHouseDB\Exception\TransportException
265
     */
266 7
    public function writeAsyncCSV($sql, $file_name)
267
    {
268 7
        $query = new Query($sql);
269
270 7
        $url = $this->getUrl([
271 7
            'readonly' => 0,
272 7
            'query' => $query->toSql()
273
        ]);
274
275
        $extendinfo = [
276 7
            'sql' => $sql,
277 7
            'query' => $query,
278 7
            'format'=> $query->getFormat()
279
        ];
280
281 7
        $request = $this->newRequest($extendinfo);
282 7
        $request->url($url);
283
284
        $request->setCallbackFunction(function(CurlerRequest $request) {
285 7
            $handle=$request->getInfileHandle();
286 7
            if (is_resource($handle)) {
287 7
                fclose($handle);
288
            }
289 7
        });
290
291 7
        $request->setInfile($file_name);
292 7
        $this->_curler->addQueLoop($request);
293
294 7
        return new Statement($request);
295
    }
296
297
    /**
298
     * get Count Pending Query in Queue
299
     *
300
     * @return int
301
     */
302 9
    public function getCountPendingQueue()
303
    {
304 9
        return $this->_curler->countPending();
305
    }
306
307
    /**
308
     * set Connect TimeOut in seconds [CURLOPT_CONNECTTIMEOUT] ( int )
309
     *
310
     * @param int $connectTimeOut
311
     */
312 2
    public function setConnectTimeOut($connectTimeOut)
313
    {
314 2
        $this->_connectTimeOut = $connectTimeOut;
315 2
    }
316
317
    /**
318
     * get ConnectTimeOut in seconds
319
     *
320
     * @return int
321
     */
322 1
    public function getConnectTimeOut()
323
    {
324 1
        return $this->_connectTimeOut;
325
    }
326
327
328 1
    public function __findXClickHouseProgress($handle)
329
    {
330 1
        $code = curl_getinfo($handle, CURLINFO_HTTP_CODE);
331
332
        // Search X-ClickHouse-Progress
333 1
        if ($code == 200) {
334 1
            $response = curl_multi_getcontent($handle);
335 1
            $header_size = curl_getinfo($handle, CURLINFO_HEADER_SIZE);
336 1
            if (!$header_size) {
337
                return false;
338
            }
339
340 1
            $header = substr($response, 0, $header_size);
341 1
            if (!$header_size) {
342
                return false;
343
            }
344 1
            $pos = strrpos($header, 'X-ClickHouse-Progress');
345
346 1
            if (!$pos) {
347
                return false;
348
            }
349
350 1
            $last = substr($header, $pos);
351 1
            $data = @json_decode(str_ireplace('X-ClickHouse-Progress:', '', $last), true);
352
353 1
            if ($data && is_callable($this->xClickHouseProgress)) {
354
355 1
                if (is_array($this->xClickHouseProgress)) {
356
                    call_user_func_array($this->xClickHouseProgress, [$data]);
357
                } else {
358 1
                    call_user_func($this->xClickHouseProgress, $data);
359
                }
360
361
362
            }
363
364
        }
365
366 1
    }
367
368
    /**
369
     * @param Query $query
370
     * @param null|WhereInFile $whereInFile
371
     * @param null|WriteToFile $writeToFile
372
     * @return CurlerRequest
373
     * @throws \Exception
374
     */
375 36
    public function getRequestRead(Query $query, $whereInFile = null, $writeToFile = null)
376
    {
377 36
        $urlParams = ['readonly' => 1];
378 36
        $query_as_string = false;
379
        // ---------------------------------------------------------------------------------
380 36
        if ($whereInFile instanceof WhereInFile && $whereInFile->size()) {
381
            // $request = $this->prepareSelectWhereIn($request, $whereInFile);
382 1
            $structure = $whereInFile->fetchUrlParams();
383
            // $structure = [];
384 1
            $urlParams = array_merge($urlParams, $structure);
385 1
            $query_as_string = true;
386
        }
387
        // ---------------------------------------------------------------------------------
388
        // if result to file
389 36
        if ($writeToFile instanceof WriteToFile && $writeToFile->fetchFormat()) {
390 1
            $query->setFormat($writeToFile->fetchFormat());
391 1
            unset($urlParams['extremes']);
392
        }
393
        // ---------------------------------------------------------------------------------
394
        // makeRequest read
395 36
        $request = $this->makeRequest($query, $urlParams, $query_as_string);
396
        // ---------------------------------------------------------------------------------
397
        // attach files
398 36
        if ($whereInFile instanceof WhereInFile && $whereInFile->size()) {
399 1
            $request->attachFiles($whereInFile->fetchFiles());
400
        }
401
        // ---------------------------------------------------------------------------------
402
        // result to file
403 36
        if ($writeToFile instanceof WriteToFile && $writeToFile->fetchFormat()) {
404
405 1
            $fout = fopen($writeToFile->fetchFile(), 'w');
406 1
            if (is_resource($fout)) {
407
408 1
                $isGz = $writeToFile->getGzip();
409
410 1
                if ($isGz) {
411
                    // write gzip header
412
                    // "\x1f\x8b\x08\x00\x00\x00\x00\x00"
413
                    // fwrite($fout, "\x1F\x8B\x08\x08".pack("V", time())."\0\xFF", 10);
414
                    // write the original file name
415
                    // $oname = str_replace("\0", "", basename($writeToFile->fetchFile()));
416
                    // fwrite($fout, $oname."\0", 1+strlen($oname));
417
418
                    fwrite($fout, "\x1f\x8b\x08\x00\x00\x00\x00\x00");
419
420
                }
421
422
423
                $request->setResultFileHandle($fout, $isGz)->setCallbackFunction(function(CurlerRequest $request) {
424
                    fclose($request->getResultFileHandle());
425 1
                });
426
            }
427
        }
428 36
        if ($this->xClickHouseProgress)
429
        {
430 1
            $request->setFunctionProgress([$this, '__findXClickHouseProgress']);
431
        }
432
        // ---------------------------------------------------------------------------------
433 36
        return $request;
434
435
    }
436
437 1
    public function cleanQueryDegeneration()
438
    {
439 1
        $this->_query_degenerations = [];
440 1
        return true;
441
    }
442
443 44
    public function addQueryDegeneration(Degeneration $degeneration)
444
    {
445 44
        $this->_query_degenerations[] = $degeneration;
446 44
        return true;
447
    }
448
449
    /**
450
     * @param Query $query
451
     * @return CurlerRequest
452
     * @throws \ClickHouseDB\Exception\TransportException
453
     */
454 18
    public function getRequestWrite(Query $query)
455
    {
456 18
        $urlParams = ['readonly' => 0];
457 18
        return $this->makeRequest($query, $urlParams);
458
    }
459
460
    /**
461
     * @param string $sql
462
     * @param array $bindings
463
     * @return Query
464
     */
465 36
    private function prepareQuery($sql, $bindings)
466
    {
467
468
        // add Degeneration query
469 36
        foreach ($this->_query_degenerations as $degeneration) {
470 36
            $degeneration->bindParams($bindings);
471
        }
472
473 36
        return new Query($sql, $this->_query_degenerations);
474
    }
475
476
477
    /**
478
     * @param Query|string $sql
479
     * @param array $bindings
480
     * @param null|WhereInFile $whereInFile
481
     * @param null|WriteToFile $writeToFile
482
     * @return CurlerRequest
483
     * @throws \Exception
484
     */
485 36
    private function prepareSelect($sql, $bindings, $whereInFile, $writeToFile = null)
486
    {
487 36
        if ($sql instanceof Query) {
488
            return $this->getRequestWrite($sql);
489
        }
490
491
492 36
        $query = $this->prepareQuery($sql, $bindings);
493 36
        $query->setFormat('JSON');
494 36
        return $this->getRequestRead($query, $whereInFile, $writeToFile);
495
496
    }
497
498
    /**
499
     * @param Query|string $sql
500
     * @param array $bindings
501
     * @return CurlerRequest
502
     * @throws \ClickHouseDB\Exception\TransportException
503
     */
504 19
    private function prepareWrite($sql, $bindings = [])
505
    {
506 19
        if ($sql instanceof Query) {
507
            return $this->getRequestWrite($sql);
508
        }
509
510 19
        $query = $this->prepareQuery($sql, $bindings);
511 18
        return $this->getRequestWrite($query);
512
    }
513
514
    /**
515
     * @return bool
516
     * @throws \ClickHouseDB\Exception\TransportException
517
     */
518 8
    public function executeAsync()
519
    {
520 8
        return $this->_curler->execLoopWait();
521
    }
522
523
    /**
524
     * @param Query|string $sql
525
     * @param array $bindings
526
     * @param null|WhereInFile $whereInFile
527
     * @param null|WriteToFile $writeToFile
528
     * @return Statement
529
     * @throws \ClickHouseDB\Exception\TransportException
530
     * @throws \Exception
531
     */
532 35
    public function select($sql, array $bindings = [], $whereInFile = null, $writeToFile = null)
533
    {
534 35
        $request = $this->prepareSelect($sql, $bindings, $whereInFile, $writeToFile);
535 35
        $this->_curler->execOne($request);
536 35
        return new Statement($request);
537
    }
538
539
    /**
540
     * @param Query|string $sql
541
     * @param array $bindings
542
     * @param null|WhereInFile $whereInFile
543
     * @param null|WriteToFile $writeToFile
544
     * @return Statement
545
     * @throws \ClickHouseDB\Exception\TransportException
546
     * @throws \Exception
547
     */
548 4
    public function selectAsync($sql, array $bindings = [], $whereInFile = null, $writeToFile = null)
549
    {
550 4
        $request = $this->prepareSelect($sql, $bindings, $whereInFile, $writeToFile);
551 4
        $this->_curler->addQueLoop($request);
552 4
        return new Statement($request);
553
    }
554
555
    /**
556
     * @param callable $callback
557
     */
558 1
    public function setProgressFunction(callable $callback)
559
    {
560 1
        $this->xClickHouseProgress = $callback;
561 1
    }
562
563
    /**
564
     * @param string $sql
565
     * @param array $bindings
566
     * @param bool $exception
567
     * @return Statement
568
     * @throws \ClickHouseDB\Exception\TransportException
569
     */
570 19
    public function write($sql, array $bindings = [], $exception = true)
571
    {
572 19
        $request = $this->prepareWrite($sql, $bindings);
573 18
        $this->_curler->execOne($request);
574 18
        $response = new Statement($request);
575 18
        if ($exception) {
576 18
            if ($response->isError()) {
577 3
                $response->error();
578
            }
579
        }
580 16
        return $response;
581
    }
582
}
583