Failed Conditions
Pull Request — master (#102)
by Šimon
02:27
created

src/Transport/Http.php (40 issues)

1
<?php
2
3
namespace ClickHouseDB\Transport;
4
5
use ClickHouseDB\Exception\TransportException;
6
use ClickHouseDB\Query\Degeneration;
7
use ClickHouseDB\Query\Query;
8
use ClickHouseDB\Query\WhereInFile;
9
use ClickHouseDB\Query\WriteToFile;
10
use ClickHouseDB\Settings;
11
use ClickHouseDB\Statement;
12
use function array_merge;
13
use function http_build_query;
14
use const PHP_EOL;
15
16
class Http
17
{
18
    /**
0 ignored issues
show
Found multi-line comment for property \ClickHouseDB\Transport\Http::$username with single line content, use one-line comment instead.
Loading history...
19
     * @var string
20
     */
21
    private $username;
22
23
    /**
0 ignored issues
show
Found multi-line comment for property \ClickHouseDB\Transport\Http::$password with single line content, use one-line comment instead.
Loading history...
24
     * @var string
25
     */
26
    private $password;
27
28
    /**
0 ignored issues
show
Found multi-line comment for property \ClickHouseDB\Transport\Http::$host with single line content, use one-line comment instead.
Loading history...
29
     * @var string
30
     */
31
    private $host;
32
33
    /**
0 ignored issues
show
Found multi-line comment for property \ClickHouseDB\Transport\Http::$port with single line content, use one-line comment instead.
Loading history...
34
     * @var int
35
     */
36
    private $port;
37
38
    /** @var bool */
39
    private $https;
40
41
    /**
0 ignored issues
show
Found multi-line comment for property \ClickHouseDB\Transport\Http::$verbose with single line content, use one-line comment instead.
Loading history...
42
     * @var bool|int
43
     */
44
    private $verbose = false;
45
46
    /**
47
     * @var CurlerRolling
48
     */
49
    private $_curler = null;
50
51
    /**
0 ignored issues
show
Found multi-line comment for property \ClickHouseDB\Transport\Http::$settings with single line content, use one-line comment instead.
Loading history...
52
     * @var Settings
53
     */
54
    private $settings;
55
56
    /**
57
     * @var array
58
     */
59
    private $_query_degenerations = [];
60
61
    /**
62
     * Count seconds (float)
63
     *
64
     * @var float
65
     */
66
    private $timeout = 20;
67
68
    /**
69
     * Count seconds (float)
70
     *
71
     * @var float
72
     */
73
    private $connectTimeout = 5;
74
75
    /**
76
     * @var callable|null
77
     */
78
    private $xClickHouseProgress;
79
80
    /** @var string */
81
    private $database;
82
83 63
    public function __construct(
0 ignored issues
show
Coding Style Documentation introduced by
Missing doc comment for function __construct()
Loading history...
84
        string $host,
85
        int $port,
86
        string $username,
87
        string $password,
88
        string $database,
89
        bool $https = false
90
    ) {
91 63
        $this->host     = $host;
92 63
        $this->port     = $port;
93 63
        $this->username = $username;
94 63
        $this->password = $password;
95 63
        $this->setDatabase($database);
96 63
        $this->setHttps($https);
97 63
        $this->settings = new Settings();
98
99 63
        $this->setCurler();
100 63
    }
101
102 63
    public function setHttps(bool $https) : void
0 ignored issues
show
Coding Style Documentation introduced by
Missing doc comment for function setHttps()
Loading history...
103
    {
104 63
        $this->https = $https;
105 63
    }
106
107 63
    public function setCurler()
108
    {
109 63
        $this->_curler = new CurlerRolling();
110 63
    }
111
112
    /**
113
     * @return CurlerRolling
114
     */
115
    public function getCurler()
116
    {
117
        return $this->_curler;
118
    }
119
120 63
    public function setDatabase(string $database) : void
0 ignored issues
show
Coding Style Documentation introduced by
Missing doc comment for function setDatabase()
Loading history...
121
    {
122 63
        $this->database = $database;
123 63
    }
124
125
    public function setHost(string $host) : void
0 ignored issues
show
Coding Style Documentation introduced by
Missing doc comment for function setHost()
Loading history...
126
    {
127
        $this->host = $host;
128
    }
129
130
    /**
131
     * @return string
132
     */
133 63
    public function getUri()
0 ignored issues
show
Method \ClickHouseDB\Transport\Http::getUri() does not have return type hint for its return value but it should be possible to add it based on @return annotation "string".
Loading history...
134
    {
135 63
        $proto = 'http';
136 63
        if ($this->https) {
137 1
            $proto = 'https';
138
        }
139 63
        $uri = $proto . '://' . $this->host;
140 63
        if (stripos($this->host, '/') !== false || stripos($this->host, ':') !== false) {
141 1
            return $uri;
142
        }
143 63
        if ($this->port > 0) {
144 63
            return $uri . ':' . $this->port;
145
        }
146
147 1
        return $uri;
148
    }
149
150 63
    public function getSettings() : Settings
0 ignored issues
show
Coding Style Documentation introduced by
Missing doc comment for function getSettings()
Loading history...
151
    {
152 63
        return $this->settings;
153
    }
154
155
    public function setVerbose(bool $flag) : void
0 ignored issues
show
Coding Style Documentation introduced by
Missing doc comment for function setVerbose()
Loading history...
156
    {
157
        $this->verbose = $flag;
158
    }
159
160
    /**
161
     * @param mixed[] $params
0 ignored issues
show
Incorrect annotations group.
Loading history...
Coding Style Documentation introduced by
Missing parameter comment
Loading history...
162
     * @return string
163
     */
164 63
    private function getUrl(array $params = [])
0 ignored issues
show
Method \ClickHouseDB\Transport\Http::getUrl() does not have return type hint for its return value but it should be possible to add it based on @return annotation "string".
Loading history...
165
    {
166 63
        $settings = $this->getSettings()->getQueryableSettings($params);
167
168 63
        return $this->getUri() . '?' . http_build_query(array_merge(['database' => $this->database], $settings));
169
    }
170
171
    /**
172
     * @param array $extendinfo
173
     * @return CurlerRequest
174
     */
175 63
    private function newRequest($extendinfo)
176
    {
177 63
        $new = new CurlerRequest();
178 63
        $new->auth($this->username, $this->password)
179 63
            ->POST()
180 63
            ->setRequestExtendedInfo($extendinfo);
181
182 63
        if ($this->getSettings()->isHttpCompressionEnabled()) {
183 63
            $new->httpCompression(true);
184
        }
185
186 63
        if ($this->getSettings()->getSessionId()) {
187 1
            $new->persistent();
188
        }
189
190 63
        $new->setTimeout($this->timeout);
191 63
        $new->setConnectTimeout($this->connectTimeout); // one sec
192 63
        $new->keepAlive(); // one sec
193 63
        $new->verbose((bool) $this->verbose);
194
195 63
        return $new;
196
    }
197
198
    /**
0 ignored issues
show
Coding Style Documentation introduced by
Doc comment for parameter "$query" missing
Loading history...
199
     * @param array $urlParams
0 ignored issues
show
Incorrect annotations group.
Loading history...
@param annotation of method \ClickHouseDB\Transport\Http::makeRequest() does not specify type hint for items of its traversable parameter $urlParams.
Loading history...
Coding Style Documentation introduced by
Missing parameter comment
Loading history...
Doc comment for parameter $urlParams does not match actual variable name $query
Loading history...
200
     * @param bool  $query_as_string
0 ignored issues
show
Doc comment for parameter $query_as_string does not match actual variable name $urlParams
Loading history...
201
     * @return CurlerRequest
202
     * @throws \ClickHouseDB\Exception\TransportException
203
     */
204 63
    private function makeRequest(Query $query, array $urlParams = [], $query_as_string = false)
0 ignored issues
show
Expected type hint "array"; found "Query" for $urlParams
Loading history...
Expected type hint "bool"; found "array" for $query_as_string
Loading history...
205
    {
206 63
        $sql = $query->toSql();
207
208 63
        if ($query_as_string) {
209 1
            $urlParams['query'] = $sql;
210
        }
211
212 63
        $url = $this->getUrl($urlParams);
213
214
        $extendinfo = [
215 63
            'sql'    => $sql,
216 63
            'query'  => $query,
217 63
            'format' => $query->getFormat(),
218
        ];
219
220 63
        $new = $this->newRequest($extendinfo);
221 63
        $new->url($url);
222
223 63
        if (! $query_as_string) {
224 63
            $new->parameters_json($sql);
225
        }
226 63
        if ($this->getSettings()->isHttpCompressionEnabled()) {
227 63
            $new->httpCompression(true);
228
        }
229
230 63
        return $new;
231
    }
232
233
    /**
234
     * @param string|Query $sql
235
     * @return CurlerRequest
236
     */
237 3
    public function writeStreamData($sql)
238
    {
239
240 3
        if ($sql instanceof Query) {
241 1
            $query = $sql;
242
        } else {
243 2
            $query = new Query($sql);
244
        }
245
246 3
        $url        = $this->getUrl([
247 3
            'query' => $query->toSql(),
0 ignored issues
show
This array key does not seem to be aligned correctly; expected 37 spaces, but found 12.
Loading history...
248
        ]);
0 ignored issues
show
The closing parenthesis does not seem to be aligned correctly; expected 36 space(s), but found 8.
Loading history...
249
        $extendinfo = [
250 3
            'sql'    => $sql,
251 3
            'query'  => $query,
252 3
            'format' => $query->getFormat(),
253
        ];
254
255 3
        $request = $this->newRequest($extendinfo);
256 3
        $request->url($url);
257
258 3
        return $request;
259
    }
260
261
    /**
262
     * @param string $sql
263
     * @param string $file_name
264
     * @return Statement
265
     * @throws \ClickHouseDB\Exception\TransportException
266
     */
267 8
    public function writeAsyncCSV($sql, $file_name)
268
    {
269 8
        $query = new Query($sql);
270
271 8
        $url = $this->getUrl([
272 8
            'query' => $query->toSql(),
0 ignored issues
show
This array key does not seem to be aligned correctly; expected 30 spaces, but found 12.
Loading history...
273
        ]);
274
275
        $extendinfo = [
276 8
            'sql'    => $sql,
277 8
            'query'  => $query,
278 8
            'format' => $query->getFormat(),
279
        ];
280
281 8
        $request = $this->newRequest($extendinfo);
282 8
        $request->url($url);
283
284
        $request->setCallbackFunction(function (CurlerRequest $request) {
285 8
            $handle = $request->getInfileHandle();
286 8
            if (is_resource($handle)) {
287 8
                fclose($handle);
288
            }
289 8
        });
290
291 8
        $request->setInfile($file_name);
292 8
        $this->_curler->addQueLoop($request);
293
294 8
        return new Statement($request);
295
    }
296
297
    /**
298
     * get Count Pending Query in Queue
299
     *
300
     * @return int
301
     */
302 12
    public function getCountPendingQueue()
303
    {
304 12
        return $this->_curler->countPending();
305
    }
306
307 1
    public function getTimeout() : float
0 ignored issues
show
Coding Style Documentation introduced by
Missing doc comment for function getTimeout()
Loading history...
308
    {
309 1
        return $this->timeout;
310
    }
311
312 2
    public function setTimeout(float $timeout) : void
0 ignored issues
show
Coding Style Documentation introduced by
Missing doc comment for function setTimeout()
Loading history...
313
    {
314 2
        $this->timeout = $timeout;
315 2
    }
316
317
    /**
318
     * Get ConnectTimeout in seconds
319
     */
0 ignored issues
show
Coding Style Documentation introduced by
Missing @return tag in function comment
Loading history...
320 39
    public function getConnectTimeout() : float
321
    {
322 39
        return $this->connectTimeout;
323
    }
324
325
    /**
0 ignored issues
show
Coding Style Documentation introduced by
Doc comment for parameter "$connectTimeOut" missing
Loading history...
326
     * Set Connect Timeout in seconds
327
     */
0 ignored issues
show
Coding Style Documentation introduced by
Missing @return tag in function comment
Loading history...
328 2
    public function setConnectTimeout(float $connectTimeOut) : void
329
    {
330 2
        $this->connectTimeout = $connectTimeOut;
331 2
    }
332
333 1
    private function findXClickHouseProgress($handle)
0 ignored issues
show
Method \ClickHouseDB\Transport\Http::findXClickHouseProgress() does not have parameter type hint nor @param annotation for its parameter $handle.
Loading history...
Method \ClickHouseDB\Transport\Http::findXClickHouseProgress() does not have return type hint nor @return annotation for its return value.
Loading history...
Coding Style Documentation introduced by
Missing doc comment for function findXClickHouseProgress()
Loading history...
334
    {
335 1
        $code = curl_getinfo($handle, CURLINFO_HTTP_CODE);
336
337
        // Search X-ClickHouse-Progress
338 1
        if ($code == 200) {
339 1
            $response    = curl_multi_getcontent($handle);
340 1
            $header_size = curl_getinfo($handle, CURLINFO_HEADER_SIZE);
341 1
            if (! $header_size) {
342
                return false;
343
            }
344
345 1
            $header = substr($response, 0, $header_size);
346 1
            if (! $header_size) {
347
                return false;
348
            }
349 1
            $pos = strrpos($header, 'X-ClickHouse-Progress');
350
351 1
            if (! $pos) {
352
                return false;
353
            }
354
355 1
            $last = substr($header, $pos);
356 1
            $data = @json_decode(str_ireplace('X-ClickHouse-Progress:', '', $last), true);
357
358 1
            if ($data && $this->xClickHouseProgress !== null) {
359 1
                ($this->xClickHouseProgress)($data);
360
            }
361
        }
362 1
    }
363
364
    /**
365
     * @param Query            $query
366
     * @param null|WhereInFile $whereInFile
367
     * @param null|WriteToFile $writeToFile
368
     * @return CurlerRequest
369
     * @throws \Exception
370
     */
371 35
    public function getRequestRead(Query $query, $whereInFile = null, $writeToFile = null)
372
    {
373 35
        $query_as_string = false;
374 35
        $urlParams       = [];
375
        // ---------------------------------------------------------------------------------
376 35
        if ($whereInFile instanceof WhereInFile && $whereInFile->size()) {
377
            // $request = $this->prepareSelectWhereIn($request, $whereInFile);
378 1
            $structure       = $whereInFile->fetchUrlParams();
379 1
            $urlParams       = $structure;
380 1
            $query_as_string = true;
381
        }
382
        // ---------------------------------------------------------------------------------
383
        // if result to file
384 35
        if ($writeToFile instanceof WriteToFile && $writeToFile->fetchFormat()) {
385 1
            $query->setFormat($writeToFile->fetchFormat());
386 1
            unset($urlParams['extremes']);
387
        }
388
        // ---------------------------------------------------------------------------------
389
        // makeRequest read
390 35
        $request = $this->makeRequest($query, $urlParams, $query_as_string);
391
        // ---------------------------------------------------------------------------------
392
        // attach files
393 35
        if ($whereInFile instanceof WhereInFile && $whereInFile->size()) {
394 1
            $request->attachFiles($whereInFile->fetchFiles());
395
        }
396
        // ---------------------------------------------------------------------------------
397
        // result to file
398 35
        if ($writeToFile instanceof WriteToFile && $writeToFile->fetchFormat()) {
399
400 1
            $fout = fopen($writeToFile->fetchFile(), 'w');
401 1
            if (is_resource($fout)) {
402
403 1
                $isGz = $writeToFile->getGzip();
404
405 1
                if ($isGz) {
406
                    // write gzip header
407
                    // "\x1f\x8b\x08\x00\x00\x00\x00\x00"
408
                    // fwrite($fout, "\x1F\x8B\x08\x08".pack("V", time())."\0\xFF", 10);
409
                    // write the original file name
410
                    // $oname = str_replace("\0", "", basename($writeToFile->fetchFile()));
411
                    // fwrite($fout, $oname."\0", 1+strlen($oname));
412
413
                    fwrite($fout, "\x1f\x8b\x08\x00\x00\x00\x00\x00");
414
                }
415
416
                $request->setResultFileHandle($fout, $isGz)->setCallbackFunction(function (CurlerRequest $request) {
417
                    fclose($request->getResultFileHandle());
418 1
                });
419
            }
420
        }
421 35
        if ($this->xClickHouseProgress !== null) {
422
            $request->setFunctionProgress(function ($x) {
423 1
                return $this->findXClickHouseProgress($x);
424 1
            });
425
        }
426
427
        // ---------------------------------------------------------------------------------
428 35
        return $request;
429
    }
430
431 1
    public function cleanQueryDegeneration() : void
432
    {
433 1
        $this->_query_degenerations = [];
434 1
    }
435
436 63
    public function addQueryDegeneration(Degeneration $degeneration) : void
437
    {
438 63
        $this->_query_degenerations[] = $degeneration;
439 63
    }
440
441
    /**
442
     * @param Query $query
443
     * @return CurlerRequest
444
     * @throws \ClickHouseDB\Exception\TransportException
445
     */
446 63
    public function getRequestWrite(Query $query)
0 ignored issues
show
Method \ClickHouseDB\Transport\Http::getRequestWrite() does not have return type hint for its return value but it should be possible to add it based on @return annotation "CurlerRequest".
Loading history...
447
    {
448 63
        return $this->makeRequest($query);
449
    }
450
451
    /**
452
     * @throws TransportException
453
     */
454 39
    public function ping() : bool
455
    {
456 39
        $request = new CurlerRequest();
457 39
        $request->url($this->getUri())->verbose(false)->GET()->setConnectTimeout($this->getConnectTimeout());
458 39
        $this->_curler->execOne($request);
459
460 39
        return $request->response()->body() === 'Ok.' . PHP_EOL;
461
    }
462
463
    /**
464
     * @param string  $sql
465
     * @param mixed[] $bindings
466
     * @return Query
467
     */
468 63
    private function prepareQuery($sql, $bindings)
469
    {
470
471
        // add Degeneration query
472 63
        foreach ($this->_query_degenerations as $degeneration) {
473 63
            $degeneration->bindParams($bindings);
474
        }
475
476 63
        return new Query($sql, $this->_query_degenerations);
477
    }
478
479
    /**
480
     * @param Query|string     $sql
481
     * @param mixed[]          $bindings
482
     * @param null|WhereInFile $whereInFile
483
     * @param null|WriteToFile $writeToFile
484
     * @return CurlerRequest
485
     * @throws \Exception
486
     */
487 34
    private function prepareSelect($sql, $bindings, $whereInFile, $writeToFile = null)
488
    {
489 34
        if ($sql instanceof Query) {
490
            return $this->getRequestWrite($sql);
491
        }
492 34
        $query = $this->prepareQuery($sql, $bindings);
493 34
        $query->setFormat('JSON');
494
495 34
        return $this->getRequestRead($query, $whereInFile, $writeToFile);
496
    }
497
498
    /**
499
     * @param Query|string $sql
500
     * @param mixed[]      $bindings
501
     * @return CurlerRequest
502
     * @throws \ClickHouseDB\Exception\TransportException
503
     */
504 63
    private function prepareWrite($sql, $bindings = [])
505
    {
506 63
        if ($sql instanceof Query) {
507
            return $this->getRequestWrite($sql);
508
        }
509
510 63
        $query = $this->prepareQuery($sql, $bindings);
511
512 63
        return $this->getRequestWrite($query);
513
    }
514
515
    /**
516
     * @return bool
517
     * @throws \ClickHouseDB\Exception\TransportException
518
     */
519 10
    public function executeAsync()
520
    {
521 10
        return $this->_curler->execLoopWait();
522
    }
523
524
    /**
525
     * @param Query|string     $sql
526
     * @param mixed[]          $bindings
527
     * @param null|WhereInFile $whereInFile
528
     * @param null|WriteToFile $writeToFile
529
     * @return Statement
530
     * @throws \ClickHouseDB\Exception\TransportException
531
     * @throws \Exception
532
     */
533 29
    public function select($sql, array $bindings = [], $whereInFile = null, $writeToFile = null)
534
    {
535 29
        $request = $this->prepareSelect($sql, $bindings, $whereInFile, $writeToFile);
536 29
        $this->_curler->execOne($request);
537
538 29
        return new Statement($request);
539
    }
540
541
    /**
542
     * @param Query|string     $sql
0 ignored issues
show
Incorrect annotations group.
Loading history...
Coding Style Documentation introduced by
Missing parameter comment
Loading history...
543
     * @param mixed[]          $bindings
544
     * @param null|WhereInFile $whereInFile
545
     * @param null|WriteToFile $writeToFile
546
     * @return Statement
547
     * @throws \ClickHouseDB\Exception\TransportException
548
     * @throws \Exception
549
     */
550 5
    public function selectAsync($sql, array $bindings = [], $whereInFile = null, $writeToFile = null)
551
    {
552 5
        $request = $this->prepareSelect($sql, $bindings, $whereInFile, $writeToFile);
553 5
        $this->_curler->addQueLoop($request);
554
555 5
        return new Statement($request);
556
    }
557
558 1
    public function setProgressFunction(callable $callback) : void
0 ignored issues
show
Coding Style Documentation introduced by
Missing doc comment for function setProgressFunction()
Loading history...
559
    {
560 1
        $this->xClickHouseProgress = $callback;
561 1
    }
562
563
    /**
564
     * @param string  $sql
565
     * @param mixed[] $bindings
566
     * @param bool    $exception
567
     * @return Statement
568
     * @throws \ClickHouseDB\Exception\TransportException
569
     */
570 63
    public function write($sql, array $bindings = [], $exception = true)
571
    {
572 63
        $request = $this->prepareWrite($sql, $bindings);
573 63
        $this->_curler->execOne($request);
574 63
        $response = new Statement($request);
575 63
        if ($exception) {
576 63
            if ($response->isError()) {
577 3
                $response->error();
578
            }
579
        }
580
581 63
        return $response;
582
    }
583
584
    /**
585
     * @param Stream        $streamRW
586
     * @param CurlerRequest $request
587
     * @return Statement
588
     * @throws \ClickHouseDB\Exception\TransportException
589
     */
590 2
    private function streaming(Stream $streamRW, CurlerRequest $request)
591
    {
592 2
        $callable = $streamRW->getClosure();
593 2
        $stream   = $streamRW->getStream();
594
595
        try {
596
597 2
            if (! is_callable($callable)) {
598
                if ($streamRW->isWrite()) {
0 ignored issues
show
Blank line found at start of control structure
Loading history...
599
600
                    $callable = function ($ch, $fd, $length) use ($stream) {
601
                        return ($line = fread($stream, $length)) ? $line : '';
602
                    };
603
                } else {
604
                    $callable = function ($ch, $fd) use ($stream) {
605
                        return fwrite($stream, $fd);
606
                    };
607
                }
608
            }
609
610 2
            if ($streamRW->isGzipHeader()) {
611
612 1
                if ($streamRW->isWrite()) {
613 1
                    $request->header('Content-Encoding', 'gzip');
614 1
                    $request->header('Content-Type', 'application/x-www-form-urlencoded');
615
                } else {
616
                    $request->header('Accept-Encoding', 'gzip');
617
                }
618
            }
619
620 2
            $request->header('Transfer-Encoding', 'chunked');
621
622 2
            if ($streamRW->isWrite()) {
623 1
                $request->setReadFunction($callable);
624
            } else {
625 1
                $request->setWriteFunction($callable);
626
//                $request->setHeaderFunction($callableHead);
627
            }
628
629 2
            $this->_curler->execOne($request, true);
630 2
            $response = new Statement($request);
631 2
            if ($response->isError()) {
632
                $response->error();
633
            }
634
635 2
            return $response;
636
        } finally {
637 2
            if ($streamRW->isWrite())
638 2
                fclose($stream);
639
        }
640
    }
641
642
    /**
643
     * @param Stream  $streamRead
644
     * @param string  $sql
645
     * @param mixed[] $bindings
646
     * @return Statement
647
     * @throws \ClickHouseDB\Exception\TransportException
648
     */
649 1
    public function streamRead(Stream $streamRead, $sql, $bindings = [])
650
    {
651 1
        $sql     = $this->prepareQuery($sql, $bindings);
652 1
        $request = $this->getRequestRead($sql);
653
654 1
        return $this->streaming($streamRead, $request);
655
    }
656
657
    /**
658
     * @param Stream  $streamWrite
659
     * @param string  $sql
660
     * @param mixed[] $bindings
661
     * @return Statement
662
     * @throws \ClickHouseDB\Exception\TransportException
663
     */
664 1
    public function streamWrite(Stream $streamWrite, $sql, $bindings = [])
665
    {
666 1
        $sql     = $this->prepareQuery($sql, $bindings);
667 1
        $request = $this->writeStreamData($sql);
668
669 1
        return $this->streaming($streamWrite, $request);
670
    }
671
}
672