Passed
Push — master ( cc4828...00e84f )
by Igor
10:05
created

src/Transport/Http.php (3 issues)

Labels
1
<?php
2
3
namespace ClickHouseDB\Transport;
4
5
use ClickHouseDB\Exception\TransportException;
6
use ClickHouseDB\Query\Degeneration;
7
use ClickHouseDB\Query\Query;
8
use ClickHouseDB\Query\WhereInFile;
9
use ClickHouseDB\Query\WriteToFile;
10
use ClickHouseDB\Settings;
11
use ClickHouseDB\Statement;
12
use const PHP_EOL;
13
14
class Http
15
{
16
    const AUTH_METHOD_HEADER       = 1;
17
    const AUTH_METHOD_QUERY_STRING = 2;
18
    const AUTH_METHOD_BASIC_AUTH   = 3;
19
20
    const AUTH_METHODS_LIST = [
21
        self::AUTH_METHOD_HEADER,
22
        self::AUTH_METHOD_QUERY_STRING,
23
        self::AUTH_METHOD_BASIC_AUTH,
24
    ];
25
26
    /**
27
     * @var string
28
     */
29
    private $_username = null;
30
31
    /**
32
     * @var string
33
     */
34
    private $_password = null;
35
36
    /**
37
     * The username and password can be indicated in one of three ways:
38
     *  - Using HTTP Basic Authentication.
39
     *  - In the ‘user’ and ‘password’ URL parameters.
40
     *  - Using ‘X-ClickHouse-User’ and ‘X-ClickHouse-Key’ headers (by default)
41
     *
42
     * @see https://clickhouse.tech/docs/en/interfaces/http/
43
     * @var int
44
     */
45
    private $_authMethod = self::AUTH_METHOD_HEADER;
46
47
    /**
48
     * @var string
49
     */
50
    private $_host = '';
51
52
    /**
53
     * @var int
54
     */
55
    private $_port = 0;
56
57
    /**
58
     * @var bool|int
59
     */
60
    private $_verbose = false;
61
62
    /**
63
     * @var CurlerRolling
64
     */
65
    private $_curler = null;
66
67
    /**
68
     * @var Settings
69
     */
70
    private $_settings = null;
71
72
    /**
73
     * @var array
74
     */
75
    private $_query_degenerations = [];
76
77
    /**
78
     * Count seconds (int)
79
     *
80
     * @var int
81
     */
82
    private $_connectTimeOut = 5;
83
84
    /**
85
     * @var callable
86
     */
87
    private $xClickHouseProgress = null;
88
89
    /**
90
     * @var null|string
91
     */
92
    private $sslCA = null;
93
94
    /**
95
     * Http constructor.
96
     * @param string $host
97
     * @param int $port
98
     * @param string $username
99
     * @param string $password
100
     * @param int $authMethod
101
     */
102 66
    public function __construct($host, $port, $username, $password, $authMethod = null)
103
    {
104 66
        $this->setHost($host, $port);
105
106 66
        $this->_username = $username;
107 66
        $this->_password = $password;
108 66
        if ($authMethod) {
109
            $this->_authMethod = $authMethod;
110
        }
111
112 66
        $this->_settings = new Settings($this);
113
114 66
        $this->setCurler();
115 66
    }
116
117
118 66
    public function setCurler()
119
    {
120 66
        $this->_curler = new CurlerRolling();
121 66
    }
122
123
    /**
124
     * @param CurlerRolling $curler
125
     */
126
    public function setDirtyCurler(CurlerRolling $curler)
127
    {
128
        if ($curler instanceof CurlerRolling) {
129
            $this->_curler = $curler;
130
        }
131
    }
132
133
    /**
134
     * @return CurlerRolling
135
     */
136
    public function getCurler()
137
    {
138
        return $this->_curler;
139
    }
140
141
    /**
142
     * @param string $host
143
     * @param int $port
144
     */
145 66
    public function setHost($host, $port = -1)
146
    {
147 66
        if ($port > 0) {
148 66
            $this->_port = $port;
149
        }
150
151 66
        $this->_host = $host;
152 66
    }
153
154
    /**
155
     * Sets client SSL certificate for Yandex Cloud
156
     *
157
     * @param string $caPath
158
     */
159
    public function setSslCa($caPath)
160
    {
161
        $this->sslCA = $caPath;
162
    }
163
164
    /**
165
     * @return string
166
     */
167 53
    public function getUri()
168
    {
169 53
        $proto = 'http';
170 53
        if ($this->settings()->isHttps()) {
171 1
            $proto = 'https';
172
        }
173 53
        $uri = $proto . '://' . $this->_host;
174 53
        if (stripos($this->_host, '/') !== false || stripos($this->_host, ':') !== false) {
175 1
            return $uri;
176
        }
177 53
        if (intval($this->_port) > 0) {
178 53
            return $uri . ':' . $this->_port;
179
        }
180 1
        return $uri;
181
    }
182
183
    /**
184
     * @return Settings
185
     */
186 66
    public function settings()
187
    {
188 66
        return $this->_settings;
189
    }
190
191
    /**
192
     * @param bool|int $flag
193
     * @return mixed
194
     */
195
    public function verbose($flag)
196
    {
197
        $this->_verbose = $flag;
198
        return $flag;
199
    }
200
201
    /**
202
     * @param array $params
203
     * @return string
204
     */
205 43
    private function getUrl($params = [])
206
    {
207 43
        $settings = $this->settings()->getSettings();
208
209 43
        if (is_array($params) && sizeof($params)) {
210 43
            $settings = array_merge($settings, $params);
211
        }
212
213
214 43
        if ($this->settings()->isReadOnlyUser()) {
215
            unset($settings['extremes']);
216
            unset($settings['readonly']);
217
            unset($settings['enable_http_compression']);
218
            unset($settings['max_execution_time']);
219
220
        }
221
222 43
        unset($settings['https']);
223
224
225 43
        return $this->getUri() . '?' . http_build_query($settings);
226
    }
227
228
    /**
229
     * @param array $extendinfo
230
     * @return CurlerRequest
231
     */
232 43
    private function newRequest($extendinfo)
233
    {
234 43
        $new = new CurlerRequest();
235
236 43
        switch ($this->_authMethod) {
237 43
            case self::AUTH_METHOD_QUERY_STRING:
238
                /* @todo: Move this implementation to CurlerRequest class. Possible options: the authentication method
239
                 *        should be applied in method `CurlerRequest:prepareRequest()`.
240
                 */
241
                $this->settings()->set('user', $this->_username);
242
                $this->settings()->set('password', $this->_password);
243
                break;
244 43
            case self::AUTH_METHOD_BASIC_AUTH:
245
                $new->authByBasicAuth($this->_username, $this->_password);
246
                break;
247
            default:
248
                // Auth with headers by default
249 43
                $new->authByHeaders($this->_username, $this->_password);
250 43
                break;
251
        }
252
253 43
        $new->POST()->setRequestExtendedInfo($extendinfo);
254
255 43
        if ($this->settings()->isEnableHttpCompression()) {
256 43
            $new->httpCompression(true);
257
        }
258 43
        if ($this->settings()->getSessionId()) {
259 1
            $new->persistent();
260
        }
261 43
        if ($this->sslCA) {
262
            $new->setSslCa($this->sslCA);
263
        }
264
265 43
        $new->timeOut($this->settings()->getTimeOut());
266 43
        $new->connectTimeOut($this->_connectTimeOut);//->keepAlive(); // one sec
267 43
        $new->verbose(boolval($this->_verbose));
268
269 43
        return $new;
270
    }
271
272
    /**
273
     * @param Query $query
274
     * @param array $urlParams
275
     * @param bool $query_as_string
276
     * @return CurlerRequest
277
     * @throws \ClickHouseDB\Exception\TransportException
278
     */
279 43
    private function makeRequest(Query $query, $urlParams = [], $query_as_string = false)
280
    {
281 43
        $sql = $query->toSql();
282
283 43
        if ($query_as_string) {
284 1
            $urlParams['query'] = $sql;
285
        }
286
287
        $extendinfo = [
288 43
            'sql' => $sql,
289 43
            'query' => $query,
290 43
            'format' => $query->getFormat()
291
        ];
292
293 43
        $new = $this->newRequest($extendinfo);
294
295
        /*
296
         * Build URL after request making, since URL may contain auth data. This will not matter after the
297
         * implantation of the todo in the `HTTP:newRequest()` method.
298
         */
299 43
        $url = $this->getUrl($urlParams);
300 43
        $new->url($url);
301
302
303 43
        if (!$query_as_string) {
304 43
            $new->parameters_json($sql);
305
        }
306 43
        if ($this->settings()->isEnableHttpCompression()) {
307 43
            $new->httpCompression(true);
308
        }
309
310 43
        return $new;
311
    }
312
313
    /**
314
     * @param string|Query $sql
315
     * @return CurlerRequest
316
     */
317 3
    public function writeStreamData($sql)
318
    {
319
320 3
        if ($sql instanceof Query) {
321 1
            $query = $sql;
322
        } else {
323 2
            $query = new Query($sql);
324
        }
325
326
        $extendinfo = [
327 3
            'sql' => $sql,
328 3
            'query' => $query,
329 3
            'format' => $query->getFormat()
330
        ];
331
332 3
        $request = $this->newRequest($extendinfo);
333
334
        /*
335
         * Build URL after request making, since URL may contain auth data. This will not matter after the
336
         * implantation of the todo in the `HTTP:newRequest()` method.
337
         */
338 3
        $url = $this->getUrl([
339 3
            'readonly' => 0,
340 3
            'query' => $query->toSql()
341
        ]);
342
343 3
        $request->url($url);
344 3
        return $request;
345
    }
346
347
348
    /**
349
     * @param string $sql
350
     * @param string $file_name
351
     * @return Statement
352
     * @throws \ClickHouseDB\Exception\TransportException
353
     */
354 8
    public function writeAsyncCSV($sql, $file_name)
355
    {
356 8
        $query = new Query($sql);
357
358
        $extendinfo = [
359 8
            'sql' => $sql,
360 8
            'query' => $query,
361 8
            'format' => $query->getFormat()
362
        ];
363
364 8
        $request = $this->newRequest($extendinfo);
365
366
        /*
367
         * Build URL after request making, since URL may contain auth data. This will not matter after the
368
         * implantation of the todo in the `HTTP:newRequest()` method.
369
         */
370 8
        $url = $this->getUrl([
371 8
            'readonly' => 0,
372 8
            'query' => $query->toSql()
373
        ]);
374
375 8
        $request->url($url);
376
377
        $request->setCallbackFunction(function (CurlerRequest $request) {
378 8
            $handle = $request->getInfileHandle();
379 8
            if (is_resource($handle)) {
380 8
                fclose($handle);
381
            }
382 8
        });
383
384 8
        $request->setInfile($file_name);
385 8
        $this->_curler->addQueLoop($request);
386
387 8
        return new Statement($request);
388
    }
389
390
    /**
391
     * get Count Pending Query in Queue
392
     *
393
     * @return int
394
     */
395 12
    public function getCountPendingQueue()
396
    {
397 12
        return $this->_curler->countPending();
398
    }
399
400
    /**
401
     * set Connect TimeOut in seconds [CURLOPT_CONNECTTIMEOUT] ( int )
402
     *
403
     * @param int $connectTimeOut
404
     */
405 2
    public function setConnectTimeOut($connectTimeOut)
406
    {
407 2
        $this->_connectTimeOut = $connectTimeOut;
408 2
    }
409
410
    /**
411
     * get ConnectTimeOut in seconds
412
     *
413
     * @return int
414
     */
415 37
    public function getConnectTimeOut()
416
    {
417 37
        return $this->_connectTimeOut;
418
    }
419
420
421
    public function __findXClickHouseProgress($handle)
422
    {
423
        $code = curl_getinfo($handle, CURLINFO_HTTP_CODE);
424
425
        // Search X-ClickHouse-Progress
426
        if ($code == 200) {
427
            $response = curl_multi_getcontent($handle);
428
            $header_size = curl_getinfo($handle, CURLINFO_HEADER_SIZE);
429
            if (!$header_size) {
430
                return false;
431
            }
432
433
            $header = substr($response, 0, $header_size);
434
            if (!$header_size) {
435
                return false;
436
            }
437
438
            $pos = strrpos($header, 'X-ClickHouse-Summary:');
0 ignored issues
show
Function strrpos() should not be referenced via a fallback global name, but via a use statement.
Loading history...
439
            if (!$pos) {
440
                return false;
441
            }
442
443
            $last = substr($header, $pos);
444
            $data = @json_decode(str_ireplace('X-ClickHouse-Summary:', '', $last), true);
0 ignored issues
show
It seems like str_ireplace('X-ClickHouse-Summary:', '', $last) can also be of type array; however, parameter $json of json_decode() does only seem to accept string, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

444
            $data = @json_decode(/** @scrutinizer ignore-type */ str_ireplace('X-ClickHouse-Summary:', '', $last), true);
Loading history...
445
446
            if ($data && is_callable($this->xClickHouseProgress)) {
447
448
                if (is_array($this->xClickHouseProgress)) {
449
                    call_user_func_array($this->xClickHouseProgress, [$data]);
450
                } else {
451
                    call_user_func($this->xClickHouseProgress, $data);
452
                }
453
454
455
            }
456
457
        }
458
459
    }
460
461
    /**
462
     * @param Query $query
463
     * @param null|WhereInFile $whereInFile
464
     * @param null|WriteToFile $writeToFile
465
     * @return CurlerRequest
466
     * @throws \Exception
467
     */
468 38
    public function getRequestRead(Query $query, $whereInFile = null, $writeToFile = null)
469
    {
470 38
        $urlParams = ['readonly' => 2];
471 38
        $query_as_string = false;
472
        // ---------------------------------------------------------------------------------
473 38
        if ($whereInFile instanceof WhereInFile && $whereInFile->size()) {
474
            // $request = $this->prepareSelectWhereIn($request, $whereInFile);
475 1
            $structure = $whereInFile->fetchUrlParams();
476
            // $structure = [];
477 1
            $urlParams = array_merge($urlParams, $structure);
478 1
            $query_as_string = true;
479
        }
480
        // ---------------------------------------------------------------------------------
481
        // if result to file
482 38
        if ($writeToFile instanceof WriteToFile && $writeToFile->fetchFormat()) {
483 1
            $query->setFormat($writeToFile->fetchFormat());
484 1
            unset($urlParams['extremes']);
485
        }
486
        // ---------------------------------------------------------------------------------
487
        // makeRequest read
488 38
        $request = $this->makeRequest($query, $urlParams, $query_as_string);
489
        // ---------------------------------------------------------------------------------
490
        // attach files
491 38
        if ($whereInFile instanceof WhereInFile && $whereInFile->size()) {
492 1
            $request->attachFiles($whereInFile->fetchFiles());
493
        }
494
        // ---------------------------------------------------------------------------------
495
        // result to file
496 38
        if ($writeToFile instanceof WriteToFile && $writeToFile->fetchFormat()) {
497
498 1
            $fout = fopen($writeToFile->fetchFile(), 'w');
499 1
            if (is_resource($fout)) {
500
501 1
                $isGz = $writeToFile->getGzip();
502
503 1
                if ($isGz) {
504
                    // write gzip header
505
                    // "\x1f\x8b\x08\x00\x00\x00\x00\x00"
506
                    // fwrite($fout, "\x1F\x8B\x08\x08".pack("V", time())."\0\xFF", 10);
507
                    // write the original file name
508
                    // $oname = str_replace("\0", "", basename($writeToFile->fetchFile()));
509
                    // fwrite($fout, $oname."\0", 1+strlen($oname));
510
511
                    fwrite($fout, "\x1f\x8b\x08\x00\x00\x00\x00\x00");
512
513
                }
514
515
516
                $request->setResultFileHandle($fout, $isGz)->setCallbackFunction(function (CurlerRequest $request) {
517
                    fclose($request->getResultFileHandle());
518 1
                });
519
            }
520
        }
521 38
        if ($this->xClickHouseProgress) {
522
            $request->setFunctionProgress([$this, '__findXClickHouseProgress']);
523
        }
524
        // ---------------------------------------------------------------------------------
525 38
        return $request;
526
527
    }
528
529 1
    public function cleanQueryDegeneration()
530
    {
531 1
        $this->_query_degenerations = [];
532 1
        return true;
533
    }
534
535 66
    public function addQueryDegeneration(Degeneration $degeneration)
536
    {
537 66
        $this->_query_degenerations[] = $degeneration;
538 66
        return true;
539
    }
540
541
    /**
542
     * @param Query $query
543
     * @return CurlerRequest
544
     * @throws \ClickHouseDB\Exception\TransportException
545
     */
546 26
    public function getRequestWrite(Query $query)
547
    {
548 26
        $urlParams = ['readonly' => 0];
549 26
        return $this->makeRequest($query, $urlParams);
550
    }
551
552
    /**
553
     * @throws TransportException
554
     */
555 37
    public function ping(): bool
556
    {
557 37
        $request = new CurlerRequest();
558 37
        $request->url($this->getUri())->verbose(false)->GET()->connectTimeOut($this->getConnectTimeOut());
559 37
        $this->_curler->execOne($request);
560
561 37
        return $request->response()->body() === 'Ok.' . PHP_EOL;
562
    }
563
564
    /**
565
     * @param string $sql
566
     * @param mixed[] $bindings
567
     * @return Query
568
     */
569 44
    private function prepareQuery($sql, $bindings)
570
    {
571
572
        // add Degeneration query
573 44
        foreach ($this->_query_degenerations as $degeneration) {
574 44
            $degeneration->bindParams($bindings);
575
        }
576
577 44
        return new Query($sql, $this->_query_degenerations);
578
    }
579
580
581
    /**
582
     * @param Query|string $sql
583
     * @param mixed[] $bindings
584
     * @param null|WhereInFile $whereInFile
585
     * @param null|WriteToFile $writeToFile
586
     * @return CurlerRequest
587
     * @throws \Exception
588
     */
589 37
    private function prepareSelect($sql, $bindings, $whereInFile, $writeToFile = null)
590
    {
591 37
        if ($sql instanceof Query) {
592
            return $this->getRequestWrite($sql);
593
        }
594 37
        $query = $this->prepareQuery($sql, $bindings);
595 37
        $query->setFormat('JSON');
596 37
        return $this->getRequestRead($query, $whereInFile, $writeToFile);
597
    }
598
599
600
    /**
601
     * @param Query|string $sql
602
     * @param mixed[] $bindings
603
     * @return CurlerRequest
604
     * @throws \ClickHouseDB\Exception\TransportException
605
     */
606 27
    private function prepareWrite($sql, $bindings = [])
607
    {
608 27
        if ($sql instanceof Query) {
609
            return $this->getRequestWrite($sql);
610
        }
611
612 27
        $query = $this->prepareQuery($sql, $bindings);
613 26
        return $this->getRequestWrite($query);
614
    }
615
616
    /**
617
     * @return bool
618
     * @throws \ClickHouseDB\Exception\TransportException
619
     */
620 10
    public function executeAsync()
621
    {
622 10
        return $this->_curler->execLoopWait();
623
    }
624
625
    /**
626
     * @param Query|string $sql
627
     * @param mixed[] $bindings
628
     * @param null|WhereInFile $whereInFile
629
     * @param null|WriteToFile $writeToFile
630
     * @return Statement
631
     * @throws \ClickHouseDB\Exception\TransportException
632
     * @throws \Exception
633
     */
634 30
    public function select($sql, array $bindings = [], $whereInFile = null, $writeToFile = null)
635
    {
636 30
        $request = $this->prepareSelect($sql, $bindings, $whereInFile, $writeToFile);
637 30
        $this->_curler->execOne($request);
638 30
        return new Statement($request);
639
    }
640
641
    /**
642
     * @param Query|string $sql
643
     * @param mixed[] $bindings
644
     * @param null|WhereInFile $whereInFile
645
     * @param null|WriteToFile $writeToFile
646
     * @return Statement
647
     * @throws \ClickHouseDB\Exception\TransportException
648
     * @throws \Exception
649
     */
650 7
    public function selectAsync($sql, array $bindings = [], $whereInFile = null, $writeToFile = null)
651
    {
652 7
        $request = $this->prepareSelect($sql, $bindings, $whereInFile, $writeToFile);
653 7
        $this->_curler->addQueLoop($request);
654 7
        return new Statement($request);
655
    }
656
657
    /**
658
     * @param callable $callback
659
     */
660
    public function setProgressFunction(callable $callback) : void
0 ignored issues
show
There must be no whitespace between closing parenthesis and return type colon.
Loading history...
661
    {
662
        $this->xClickHouseProgress = $callback;
663
    }
664
665
    /**
666
     * @param string $sql
667
     * @param mixed[] $bindings
668
     * @param bool $exception
669
     * @return Statement
670
     * @throws \ClickHouseDB\Exception\TransportException
671
     */
672 27
    public function write($sql, array $bindings = [], $exception = true)
673
    {
674 27
        $request = $this->prepareWrite($sql, $bindings);
675 26
        $this->_curler->execOne($request);
676 26
        $response = new Statement($request);
677 26
        if ($exception) {
678 26
            if ($response->isError()) {
679 3
                $response->error();
680
            }
681
        }
682 24
        return $response;
683
    }
684
685
    /**
686
     * @param Stream $streamRW
687
     * @param CurlerRequest $request
688
     * @return Statement
689
     * @throws \ClickHouseDB\Exception\TransportException
690
     */
691 2
    private function streaming(Stream $streamRW, CurlerRequest $request)
692
    {
693 2
        $callable = $streamRW->getClosure();
694 2
        $stream = $streamRW->getStream();
695
696
697
        try {
698
699
700 2
            if (!is_callable($callable)) {
701
                if ($streamRW->isWrite()) {
702
703
                    $callable = function ($ch, $fd, $length) use ($stream) {
704
                        return ($line = fread($stream, $length)) ? $line : '';
705
                    };
706
                } else {
707
                    $callable = function ($ch, $fd) use ($stream) {
708
                        return fwrite($stream, $fd);
709
                    };
710
                }
711
            }
712
713 2
            if ($streamRW->isGzipHeader()) {
714
715 1
                if ($streamRW->isWrite()) {
716 1
                    $request->header('Content-Encoding', 'gzip');
717 1
                    $request->header('Content-Type', 'application/x-www-form-urlencoded');
718
                } else {
719
                    $request->header('Accept-Encoding', 'gzip');
720
                }
721
722
            }
723
724
725 2
            $request->header('Transfer-Encoding', 'chunked');
726
727
728 2
            if ($streamRW->isWrite()) {
729 1
                $request->setReadFunction($callable);
730
            } else {
731 1
                $request->setWriteFunction($callable);
732
733
734
//                $request->setHeaderFunction($callableHead);
735
            }
736
737
738 2
            $this->_curler->execOne($request, true);
739 2
            $response = new Statement($request);
740 2
            if ($response->isError()) {
741
                $response->error();
742
            }
743 2
            return $response;
744
        } finally {
745 2
            if ($streamRW->isWrite())
746 2
                fclose($stream);
747
        }
748
749
750
    }
751
752
753
    /**
754
     * @param Stream $streamRead
755
     * @param string $sql
756
     * @param mixed[] $bindings
757
     * @return Statement
758
     * @throws \ClickHouseDB\Exception\TransportException
759
     */
760 1
    public function streamRead(Stream $streamRead, $sql, $bindings = [])
761
    {
762 1
        $sql = $this->prepareQuery($sql, $bindings);
763 1
        $request = $this->getRequestRead($sql);
764 1
        return $this->streaming($streamRead, $request);
765
766
    }
767
768
    /**
769
     * @param Stream $streamWrite
770
     * @param string $sql
771
     * @param mixed[] $bindings
772
     * @return Statement
773
     * @throws \ClickHouseDB\Exception\TransportException
774
     */
775 1
    public function streamWrite(Stream $streamWrite, $sql, $bindings = [])
776
    {
777 1
        $sql = $this->prepareQuery($sql, $bindings);
778 1
        $request = $this->writeStreamData($sql);
779 1
        return $this->streaming($streamWrite, $request);
780
    }
781
}
782