Passed
Push — master ( 5aaabf...5533de )
by Igor
16:52 queued 07:02
created

src/Transport/Http.php (2 issues)

Severity
1
<?php
2
3
namespace ClickHouseDB\Transport;
4
5
use ClickHouseDB\Exception\TransportException;
6
use ClickHouseDB\Query\Degeneration;
7
use ClickHouseDB\Query\Query;
8
use ClickHouseDB\Query\WhereInFile;
9
use ClickHouseDB\Query\WriteToFile;
10
use ClickHouseDB\Settings;
11
use ClickHouseDB\Statement;
12
use const PHP_EOL;
13
14
class Http
15
{
16
    /**
17
     * @var string
18
     */
19
    private $_username = null;
20
21
    /**
22
     * @var string
23
     */
24
    private $_password = null;
25
26
    /**
27
     * @var string
28
     */
29
    private $_host = '';
30
31
    /**
32
     * @var int
33
     */
34
    private $_port = 0;
35
36
    /**
37
     * @var bool|int
38
     */
39
    private $_verbose = false;
40
41
    /**
42
     * @var CurlerRolling
43
     */
44
    private $_curler = null;
45
46
    /**
47
     * @var Settings
48
     */
49
    private $_settings = null;
50
51
    /**
52
     * @var array
53
     */
54
    private $_query_degenerations = [];
55
56
    /**
57
     * Count seconds (int)
58
     *
59
     * @var int
60
     */
61
    private $_connectTimeOut = 5;
62
63
    /**
64
     * @var callable
65
     */
66
    private $xClickHouseProgress = null;
67
68
    /**
69
     * @var null|string
70
     */
71
    private $sslCA = null;
72
73
    /**
74
     * Http constructor.
75
     * @param string $host
76
     * @param int $port
77
     * @param string $username
78
     * @param string $password
79
     */
80 67
    public function __construct($host, $port, $username, $password)
81
    {
82 67
        $this->setHost($host, $port);
83
84 67
        $this->_username = $username;
85 67
        $this->_password = $password;
86 67
        $this->_settings = new Settings($this);
87
88 67
        $this->setCurler();
89 67
    }
90
91
92 67
    public function setCurler()
93
    {
94 67
        $this->_curler = new CurlerRolling();
95 67
    }
96
97
    /**
98
     * @param CurlerRolling $curler
99
     */
100
    public function setDirtyCurler(CurlerRolling $curler)
101
    {
102
        if ($curler instanceof CurlerRolling) {
103
            $this->_curler = $curler;
104
        }
105
    }
106
107
    /**
108
     * @return CurlerRolling
109
     */
110
    public function getCurler()
111
    {
112
        return $this->_curler;
113
    }
114
115
    /**
116
     * @param string $host
117
     * @param int $port
118
     */
119 67
    public function setHost($host, $port = -1)
120
    {
121 67
        if ($port > 0) {
122 67
            $this->_port = $port;
123
        }
124
125 67
        $this->_host = $host;
126 67
    }
127
128
    /**
129
     * Sets client SSL certificate for Yandex Cloud
130
     *
131
     * @param string $caPath
132
     */
133
    public function setSslCa($caPath)
0 ignored issues
show
Method \ClickHouseDB\Transport\Http::setSslCa() does not have parameter type hint for its parameter $caPath but it should be possible to add it based on @param annotation "string".
Loading history...
Method \ClickHouseDB\Transport\Http::setSslCa() does not have void return type hint.
Loading history...
134
    {
135
        $this->sslCA = $caPath;
136
    }
137
138
    /**
139
     * @return string
140
     */
141 54
    public function getUri()
142
    {
143 54
        $proto = 'http';
144 54
        if ($this->settings()->isHttps()) {
145 1
            $proto = 'https';
146
        }
147 54
        $uri = $proto . '://' . $this->_host;
148 54
        if (stripos($this->_host, '/') !== false || stripos($this->_host, ':') !== false) {
149 1
            return $uri;
150
        }
151 54
        if (intval($this->_port) > 0) {
152 54
            return $uri . ':' . $this->_port;
153
        }
154 1
        return $uri;
155
    }
156
157
    /**
158
     * @return Settings
159
     */
160 67
    public function settings()
161
    {
162 67
        return $this->_settings;
163
    }
164
165
    /**
166
     * @param bool|int $flag
167
     * @return mixed
168
     */
169
    public function verbose($flag)
170
    {
171
        $this->_verbose = $flag;
172
        return $flag;
173
    }
174
175
    /**
176
     * @param array $params
177
     * @return string
178
     */
179 44
    private function getUrl($params = [])
180
    {
181 44
        $settings = $this->settings()->getSettings();
182
183 44
        if (is_array($params) && sizeof($params)) {
184 44
            $settings = array_merge($settings, $params);
185
        }
186
187
188 44
        if ($this->settings()->isReadOnlyUser()) {
189
            unset($settings['extremes']);
190
            unset($settings['readonly']);
191
            unset($settings['enable_http_compression']);
192
            unset($settings['max_execution_time']);
193
194
        }
195
196 44
        unset($settings['https']);
197
198
199 44
        return $this->getUri() . '?' . http_build_query($settings);
200
    }
201
202
    /**
203
     * @param array $extendinfo
204
     * @return CurlerRequest
205
     */
206 44
    private function newRequest($extendinfo)
207
    {
208 44
        $new = new CurlerRequest();
209 44
        $new->authByHeaders($this->_username, $this->_password)
210 44
            ->POST()
211 44
            ->setRequestExtendedInfo($extendinfo);
212
213 44
        if ($this->settings()->isEnableHttpCompression()) {
214 44
            $new->httpCompression(true);
215
        }
216 44
        if ($this->settings()->getSessionId()) {
217 1
            $new->persistent();
218
        }
219 44
        if ($this->sslCA) {
220
            $new->setSslCa($this->sslCA);
221
        }
222
223 44
        $new->timeOut($this->settings()->getTimeOut());
224 44
        $new->connectTimeOut($this->_connectTimeOut);//->keepAlive(); // one sec
225 44
        $new->verbose(boolval($this->_verbose));
226
227 44
        return $new;
228
    }
229
230
    /**
231
     * @param Query $query
232
     * @param array $urlParams
233
     * @param bool $query_as_string
234
     * @return CurlerRequest
235
     * @throws \ClickHouseDB\Exception\TransportException
236
     */
237 44
    private function makeRequest(Query $query, $urlParams = [], $query_as_string = false)
238
    {
239 44
        $sql = $query->toSql();
240
241 44
        if ($query_as_string) {
242 1
            $urlParams['query'] = $sql;
243
        }
244
245 44
        $url = $this->getUrl($urlParams);
246
247
        $extendinfo = [
248 44
            'sql' => $sql,
249 44
            'query' => $query,
250 44
            'format' => $query->getFormat()
251
        ];
252
253 44
        $new = $this->newRequest($extendinfo);
254 44
        $new->url($url);
255
256
257 44
        if (!$query_as_string) {
258 44
            $new->parameters_json($sql);
259
        }
260 44
        if ($this->settings()->isEnableHttpCompression()) {
261 44
            $new->httpCompression(true);
262
        }
263
264 44
        return $new;
265
    }
266
267
    /**
268
     * @param string|Query $sql
269
     * @return CurlerRequest
270
     */
271 3
    public function writeStreamData($sql)
272
    {
273
274 3
        if ($sql instanceof Query) {
275 1
            $query = $sql;
276
        } else {
277 2
            $query = new Query($sql);
278
        }
279
280 3
        $url = $this->getUrl([
281 3
            'readonly' => 0,
282 3
            'query' => $query->toSql()
283
        ]);
284
        $extendinfo = [
285 3
            'sql' => $sql,
286 3
            'query' => $query,
287 3
            'format' => $query->getFormat()
288
        ];
289
290 3
        $request = $this->newRequest($extendinfo);
291 3
        $request->url($url);
292 3
        return $request;
293
    }
294
295
296
    /**
297
     * @param string $sql
298
     * @param string $file_name
299
     * @return Statement
300
     * @throws \ClickHouseDB\Exception\TransportException
301
     */
302 8
    public function writeAsyncCSV($sql, $file_name)
303
    {
304 8
        $query = new Query($sql);
305
306 8
        $url = $this->getUrl([
307 8
            'readonly' => 0,
308 8
            'query' => $query->toSql()
309
        ]);
310
311
        $extendinfo = [
312 8
            'sql' => $sql,
313 8
            'query' => $query,
314 8
            'format' => $query->getFormat()
315
        ];
316
317 8
        $request = $this->newRequest($extendinfo);
318 8
        $request->url($url);
319
320
        $request->setCallbackFunction(function (CurlerRequest $request) {
321 8
            $handle = $request->getInfileHandle();
322 8
            if (is_resource($handle)) {
323 8
                fclose($handle);
324
            }
325 8
        });
326
327 8
        $request->setInfile($file_name);
328 8
        $this->_curler->addQueLoop($request);
329
330 8
        return new Statement($request);
331
    }
332
333
    /**
334
     * get Count Pending Query in Queue
335
     *
336
     * @return int
337
     */
338 12
    public function getCountPendingQueue()
339
    {
340 12
        return $this->_curler->countPending();
341
    }
342
343
    /**
344
     * set Connect TimeOut in seconds [CURLOPT_CONNECTTIMEOUT] ( int )
345
     *
346
     * @param int $connectTimeOut
347
     */
348 2
    public function setConnectTimeOut($connectTimeOut)
349
    {
350 2
        $this->_connectTimeOut = $connectTimeOut;
351 2
    }
352
353
    /**
354
     * get ConnectTimeOut in seconds
355
     *
356
     * @return int
357
     */
358 37
    public function getConnectTimeOut()
359
    {
360 37
        return $this->_connectTimeOut;
361
    }
362
363
364 1
    public function __findXClickHouseProgress($handle)
365
    {
366 1
        $code = curl_getinfo($handle, CURLINFO_HTTP_CODE);
367
368
        // Search X-ClickHouse-Progress
369 1
        if ($code == 200) {
370 1
            $response = curl_multi_getcontent($handle);
371 1
            $header_size = curl_getinfo($handle, CURLINFO_HEADER_SIZE);
372 1
            if (!$header_size) {
373
                return false;
374
            }
375
376 1
            $header = substr($response, 0, $header_size);
377 1
            if (!$header_size) {
378
                return false;
379
            }
380 1
            $pos = strrpos($header, 'X-ClickHouse-Progress');
381
382 1
            if (!$pos) {
383
                return false;
384
            }
385
386 1
            $last = substr($header, $pos);
387 1
            $data = @json_decode(str_ireplace('X-ClickHouse-Progress:', '', $last), true);
388
389 1
            if ($data && is_callable($this->xClickHouseProgress)) {
390
391 1
                if (is_array($this->xClickHouseProgress)) {
392
                    call_user_func_array($this->xClickHouseProgress, [$data]);
393
                } else {
394 1
                    call_user_func($this->xClickHouseProgress, $data);
395
                }
396
397
398
            }
399
400
        }
401
402 1
    }
403
404
    /**
405
     * @param Query $query
406
     * @param null|WhereInFile $whereInFile
407
     * @param null|WriteToFile $writeToFile
408
     * @return CurlerRequest
409
     * @throws \Exception
410
     */
411 39
    public function getRequestRead(Query $query, $whereInFile = null, $writeToFile = null)
412
    {
413 39
        $urlParams = ['readonly' => 2];
414 39
        $query_as_string = false;
415
        // ---------------------------------------------------------------------------------
416 39
        if ($whereInFile instanceof WhereInFile && $whereInFile->size()) {
417
            // $request = $this->prepareSelectWhereIn($request, $whereInFile);
418 1
            $structure = $whereInFile->fetchUrlParams();
419
            // $structure = [];
420 1
            $urlParams = array_merge($urlParams, $structure);
421 1
            $query_as_string = true;
422
        }
423
        // ---------------------------------------------------------------------------------
424
        // if result to file
425 39
        if ($writeToFile instanceof WriteToFile && $writeToFile->fetchFormat()) {
426 1
            $query->setFormat($writeToFile->fetchFormat());
427 1
            unset($urlParams['extremes']);
428
        }
429
        // ---------------------------------------------------------------------------------
430
        // makeRequest read
431 39
        $request = $this->makeRequest($query, $urlParams, $query_as_string);
432
        // ---------------------------------------------------------------------------------
433
        // attach files
434 39
        if ($whereInFile instanceof WhereInFile && $whereInFile->size()) {
435 1
            $request->attachFiles($whereInFile->fetchFiles());
436
        }
437
        // ---------------------------------------------------------------------------------
438
        // result to file
439 39
        if ($writeToFile instanceof WriteToFile && $writeToFile->fetchFormat()) {
440
441 1
            $fout = fopen($writeToFile->fetchFile(), 'w');
442 1
            if (is_resource($fout)) {
443
444 1
                $isGz = $writeToFile->getGzip();
445
446 1
                if ($isGz) {
447
                    // write gzip header
448
                    // "\x1f\x8b\x08\x00\x00\x00\x00\x00"
449
                    // fwrite($fout, "\x1F\x8B\x08\x08".pack("V", time())."\0\xFF", 10);
450
                    // write the original file name
451
                    // $oname = str_replace("\0", "", basename($writeToFile->fetchFile()));
452
                    // fwrite($fout, $oname."\0", 1+strlen($oname));
453
454
                    fwrite($fout, "\x1f\x8b\x08\x00\x00\x00\x00\x00");
455
456
                }
457
458
459
                $request->setResultFileHandle($fout, $isGz)->setCallbackFunction(function (CurlerRequest $request) {
460
                    fclose($request->getResultFileHandle());
461 1
                });
462
            }
463
        }
464 39
        if ($this->xClickHouseProgress) {
465 1
            $request->setFunctionProgress([$this, '__findXClickHouseProgress']);
466
        }
467
        // ---------------------------------------------------------------------------------
468 39
        return $request;
469
470
    }
471
472 1
    public function cleanQueryDegeneration()
473
    {
474 1
        $this->_query_degenerations = [];
475 1
        return true;
476
    }
477
478 67
    public function addQueryDegeneration(Degeneration $degeneration)
479
    {
480 67
        $this->_query_degenerations[] = $degeneration;
481 67
        return true;
482
    }
483
484
    /**
485
     * @param Query $query
486
     * @return CurlerRequest
487
     * @throws \ClickHouseDB\Exception\TransportException
488
     */
489 26
    public function getRequestWrite(Query $query)
490
    {
491 26
        $urlParams = ['readonly' => 0];
492 26
        return $this->makeRequest($query, $urlParams);
493
    }
494
495
    /**
496
     * @throws TransportException
497
     */
498 37
    public function ping(): bool
499
    {
500 37
        $request = new CurlerRequest();
501 37
        $request->url($this->getUri())->verbose(false)->GET()->connectTimeOut($this->getConnectTimeOut());
502 37
        $this->_curler->execOne($request);
503
504 37
        return $request->response()->body() === 'Ok.' . PHP_EOL;
505
    }
506
507
    /**
508
     * @param string $sql
509
     * @param mixed[] $bindings
510
     * @return Query
511
     */
512 45
    private function prepareQuery($sql, $bindings)
513
    {
514
515
        // add Degeneration query
516 45
        foreach ($this->_query_degenerations as $degeneration) {
517 45
            $degeneration->bindParams($bindings);
518
        }
519
520 45
        return new Query($sql, $this->_query_degenerations);
521
    }
522
523
524
    /**
525
     * @param Query|string $sql
526
     * @param mixed[] $bindings
527
     * @param null|WhereInFile $whereInFile
528
     * @param null|WriteToFile $writeToFile
529
     * @return CurlerRequest
530
     * @throws \Exception
531
     */
532 38
    private function prepareSelect($sql, $bindings, $whereInFile, $writeToFile = null)
533
    {
534 38
        if ($sql instanceof Query) {
535
            return $this->getRequestWrite($sql);
536
        }
537 38
        $query = $this->prepareQuery($sql, $bindings);
538 38
        $query->setFormat('JSON');
539 38
        return $this->getRequestRead($query, $whereInFile, $writeToFile);
540
    }
541
542
543
    /**
544
     * @param Query|string $sql
545
     * @param mixed[] $bindings
546
     * @return CurlerRequest
547
     * @throws \ClickHouseDB\Exception\TransportException
548
     */
549 27
    private function prepareWrite($sql, $bindings = [])
550
    {
551 27
        if ($sql instanceof Query) {
552
            return $this->getRequestWrite($sql);
553
        }
554
555 27
        $query = $this->prepareQuery($sql, $bindings);
556 26
        return $this->getRequestWrite($query);
557
    }
558
559
    /**
560
     * @return bool
561
     * @throws \ClickHouseDB\Exception\TransportException
562
     */
563 10
    public function executeAsync()
564
    {
565 10
        return $this->_curler->execLoopWait();
566
    }
567
568
    /**
569
     * @param Query|string $sql
570
     * @param mixed[] $bindings
571
     * @param null|WhereInFile $whereInFile
572
     * @param null|WriteToFile $writeToFile
573
     * @return Statement
574
     * @throws \ClickHouseDB\Exception\TransportException
575
     * @throws \Exception
576
     */
577 31
    public function select($sql, array $bindings = [], $whereInFile = null, $writeToFile = null)
578
    {
579 31
        $request = $this->prepareSelect($sql, $bindings, $whereInFile, $writeToFile);
580 31
        $this->_curler->execOne($request);
581 31
        return new Statement($request);
582
    }
583
584
    /**
585
     * @param Query|string $sql
586
     * @param mixed[] $bindings
587
     * @param null|WhereInFile $whereInFile
588
     * @param null|WriteToFile $writeToFile
589
     * @return Statement
590
     * @throws \ClickHouseDB\Exception\TransportException
591
     * @throws \Exception
592
     */
593 7
    public function selectAsync($sql, array $bindings = [], $whereInFile = null, $writeToFile = null)
594
    {
595 7
        $request = $this->prepareSelect($sql, $bindings, $whereInFile, $writeToFile);
596 7
        $this->_curler->addQueLoop($request);
597 7
        return new Statement($request);
598
    }
599
600
    /**
601
     * @param callable $callback
602
     */
603 1
    public function setProgressFunction(callable $callback)
604
    {
605 1
        $this->xClickHouseProgress = $callback;
606 1
    }
607
608
    /**
609
     * @param string $sql
610
     * @param mixed[] $bindings
611
     * @param bool $exception
612
     * @return Statement
613
     * @throws \ClickHouseDB\Exception\TransportException
614
     */
615 27
    public function write($sql, array $bindings = [], $exception = true)
616
    {
617 27
        $request = $this->prepareWrite($sql, $bindings);
618 26
        $this->_curler->execOne($request);
619 26
        $response = new Statement($request);
620 26
        if ($exception) {
621 26
            if ($response->isError()) {
622 3
                $response->error();
623
            }
624
        }
625 24
        return $response;
626
    }
627
628
    /**
629
     * @param Stream $streamRW
630
     * @param CurlerRequest $request
631
     * @return Statement
632
     * @throws \ClickHouseDB\Exception\TransportException
633
     */
634 2
    private function streaming(Stream $streamRW, CurlerRequest $request)
635
    {
636 2
        $callable = $streamRW->getClosure();
637 2
        $stream = $streamRW->getStream();
638
639
640
        try {
641
642
643 2
            if (!is_callable($callable)) {
644
                if ($streamRW->isWrite()) {
645
646
                    $callable = function ($ch, $fd, $length) use ($stream) {
647
                        return ($line = fread($stream, $length)) ? $line : '';
648
                    };
649
                } else {
650
                    $callable = function ($ch, $fd) use ($stream) {
651
                        return fwrite($stream, $fd);
652
                    };
653
                }
654
            }
655
656 2
            if ($streamRW->isGzipHeader()) {
657
658 1
                if ($streamRW->isWrite()) {
659 1
                    $request->header('Content-Encoding', 'gzip');
660 1
                    $request->header('Content-Type', 'application/x-www-form-urlencoded');
661
                } else {
662
                    $request->header('Accept-Encoding', 'gzip');
663
                }
664
665
            }
666
667
668 2
            $request->header('Transfer-Encoding', 'chunked');
669
670
671 2
            if ($streamRW->isWrite()) {
672 1
                $request->setReadFunction($callable);
673
            } else {
674 1
                $request->setWriteFunction($callable);
675
676
677
//                $request->setHeaderFunction($callableHead);
678
            }
679
680
681 2
            $this->_curler->execOne($request, true);
682 2
            $response = new Statement($request);
683 2
            if ($response->isError()) {
684
                $response->error();
685
            }
686 2
            return $response;
687
        } finally {
688 2
            if ($streamRW->isWrite())
689 2
                fclose($stream);
690
        }
691
692
693
    }
694
695
696
    /**
697
     * @param Stream $streamRead
698
     * @param string $sql
699
     * @param mixed[] $bindings
700
     * @return Statement
701
     * @throws \ClickHouseDB\Exception\TransportException
702
     */
703 1
    public function streamRead(Stream $streamRead, $sql, $bindings = [])
704
    {
705 1
        $sql = $this->prepareQuery($sql, $bindings);
706 1
        $request = $this->getRequestRead($sql);
707 1
        return $this->streaming($streamRead, $request);
708
709
    }
710
711
    /**
712
     * @param Stream $streamWrite
713
     * @param string $sql
714
     * @param mixed[] $bindings
715
     * @return Statement
716
     * @throws \ClickHouseDB\Exception\TransportException
717
     */
718 1
    public function streamWrite(Stream $streamWrite, $sql, $bindings = [])
719
    {
720 1
        $sql = $this->prepareQuery($sql, $bindings);
721 1
        $request = $this->writeStreamData($sql);
722 1
        return $this->streaming($streamWrite, $request);
723
    }
724
}
725