Failed Conditions
Push — master ( 20fa9e...5a0a45 )
by Igor
03:28 queued 10s
created

src/Transport/Http.php (14 issues)

1
<?php
2
3
namespace ClickHouseDB\Transport;
4
5
use ClickHouseDB\Exception\TransportException;
6
use ClickHouseDB\Query\Degeneration;
7
use ClickHouseDB\Query\Query;
8
use ClickHouseDB\Query\WhereInFile;
9
use ClickHouseDB\Query\WriteToFile;
10
use ClickHouseDB\Settings;
11
use ClickHouseDB\Statement;
12
use const PHP_EOL;
13
14
class Http
15
{
16
    /**
17
     * @var string
18
     */
19
    private $_username = null;
20
21
    /**
22
     * @var string
23
     */
24
    private $_password = null;
25
26
    /**
27
     * @var string
28
     */
29
    private $_host = '';
30
31
    /**
32
     * @var int
33
     */
34
    private $_port = 0;
35
36
    /**
37
     * @var bool|int
38
     */
39
    private $_verbose = false;
40
41
    /**
42
     * @var CurlerRolling
43
     */
44
    private $_curler = null;
45
46
    /**
47
     * @var Settings
48
     */
49
    private $_settings = null;
50
51
    /**
52
     * @var array
53
     */
54
    private $_query_degenerations = [];
55
56
    /**
57
     * Count seconds (int)
58
     *
59
     * @var int
60
     */
61
    private $_connectTimeOut = 5;
62
63
    /**
64
     * @var callable
65
     */
66
    private $xClickHouseProgress = null;
67
68
    /**
69
     * Http constructor.
70
     * @param string $host
71
     * @param int $port
72
     * @param string $username
73
     * @param string $password
74
     */
75 67
    public function __construct($host, $port, $username, $password)
76
    {
77 67
        $this->setHost($host, $port);
78
79 67
        $this->_username = $username;
80 67
        $this->_password = $password;
81 67
        $this->_settings = new Settings($this);
82
83 67
        $this->setCurler();
84 67
    }
85
86
87 67
    public function setCurler()
88
    {
89 67
        $this->_curler = new CurlerRolling();
90 67
    }
91
92
    /**
93
     * @param CurlerRolling $curler
94
     */
95
    public function setDirtyCurler(CurlerRolling $curler){
96
        if ($curler instanceof  CurlerRolling){
97
            $this->_curler = $curler;
98
        }
99
    }
100
101
    /**
102
     * @return CurlerRolling
103
     */
104
    public function getCurler()
105
    {
106
        return $this->_curler;
107
    }
108
109
    /**
110
     * @param string $host
111
     * @param int $port
112
     */
113 67
    public function setHost($host, $port = -1)
114
    {
115 67
        if ($port > 0) {
116 67
            $this->_port = $port;
117
        }
118
119 67
        $this->_host = $host;
120 67
    }
121
122
    /**
123
     * @return string
124
     */
125 54
    public function getUri()
126
    {
127 54
        $proto = 'http';
128 54
        if ($this->settings()->isHttps()) {
129 1
            $proto = 'https';
130
        }
131 54
        $uri = $proto . '://' . $this->_host;
132 54
        if (stripos($this->_host,'/')!==false || stripos($this->_host,':')!==false) {
133 1
            return $uri;
134
        }
135 54
        if (intval($this->_port)>0) {
136 54
            return $uri . ':' . $this->_port;
137
        }
138 1
        return $uri;
139
    }
140
141
    /**
142
     * @return Settings
143
     */
144 67
    public function settings()
145
    {
146 67
        return $this->_settings;
147
    }
148
149
    /**
150
     * @param bool|int $flag
151
     * @return mixed
152
     */
153
    public function verbose($flag)
154
    {
155
        $this->_verbose = $flag;
156
        return $flag;
157
    }
158
159
    /**
160
     * @param array $params
161
     * @return string
162
     */
163 44
    private function getUrl($params = [])
164
    {
165 44
        $settings = $this->settings()->getSettings();
166
167 44
        if (is_array($params) && sizeof($params)) {
168 44
            $settings = array_merge($settings, $params);
169
        }
170
171
172 44
        if ($this->settings()->isReadOnlyUser())
173
        {
174
            unset($settings['extremes']);
175
            unset($settings['readonly']);
176
            unset($settings['enable_http_compression']);
177
            unset($settings['max_execution_time']);
178
179
        }
180
181 44
        unset($settings['https']);
182
183
184 44
        return $this->getUri() . '?' . http_build_query($settings);
185
    }
186
187
    /**
188
     * @param array $extendinfo
189
     * @return CurlerRequest
190
     */
191 44
    private function newRequest($extendinfo)
192
    {
193 44
        $new = new CurlerRequest();
194 44
        $new->authByHeaders($this->_username, $this->_password)
195 44
            ->POST()
196 44
            ->setRequestExtendedInfo($extendinfo);
197
198 44
        if ($this->settings()->isEnableHttpCompression()) {
199 44
            $new->httpCompression(true);
200
        }
201 44
        if ($this->settings()->getSessionId())
202
        {
203 1
            $new->persistent();
204
        }
205
206 44
        $new->timeOut($this->settings()->getTimeOut());
207 44
        $new->connectTimeOut($this->_connectTimeOut);//->keepAlive(); // one sec
208 44
        $new->verbose(boolval($this->_verbose));
209
210 44
        return $new;
211
    }
212
213
    /**
214
     * @param Query $query
215
     * @param array $urlParams
216
     * @param bool $query_as_string
217
     * @return CurlerRequest
218
     * @throws \ClickHouseDB\Exception\TransportException
219
     */
220 44
    private function makeRequest(Query $query, $urlParams = [], $query_as_string = false)
221
    {
222 44
        $sql = $query->toSql();
223
224 44
        if ($query_as_string) {
225 1
            $urlParams['query'] = $sql;
226
        }
227
228 44
        $url = $this->getUrl($urlParams);
229
230
        $extendinfo = [
231 44
            'sql' => $sql,
232 44
            'query' => $query,
233 44
            'format'=> $query->getFormat()
0 ignored issues
show
Array double arrow not aligned correctly; expected 1 space(s) but found 0
Loading history...
234
        ];
235
236 44
        $new = $this->newRequest($extendinfo);
237 44
        $new->url($url);
238
0 ignored issues
show
Functions must not contain multiple empty lines in a row; found 4 empty lines
Loading history...
239
240
241
242 44
        if (!$query_as_string) {
243 44
            $new->parameters_json($sql);
244
        }
245 44
        if ($this->settings()->isEnableHttpCompression()) {
246 44
            $new->httpCompression(true);
247
        }
248
249 44
        return $new;
250
    }
251
252
    /**
253
     * @param string|Query $sql
254
     * @return CurlerRequest
255
     */
256 3
    public function writeStreamData($sql)
257
    {
258
259 3
        if ($sql instanceof Query) {
260 1
            $query=$sql;
0 ignored issues
show
Equals sign not aligned correctly; expected 1 space but found 0 spaces
Loading history...
261
        } else {
262 2
            $query = new Query($sql);
263
        }
264
265 3
        $url = $this->getUrl([
266 3
            'readonly' => 0,
267 3
            'query' => $query->toSql()
268
        ]);
269
        $extendinfo = [
270 3
            'sql' => $sql,
271 3
            'query' => $query,
272 3
            'format'=> $query->getFormat()
0 ignored issues
show
Array double arrow not aligned correctly; expected 1 space(s) but found 0
Loading history...
273
        ];
274
275 3
        $request = $this->newRequest($extendinfo);
276 3
        $request->url($url);
277 3
        return $request;
278
    }
279
280
281
    /**
282
     * @param string $sql
283
     * @param string $file_name
284
     * @return Statement
285
     * @throws \ClickHouseDB\Exception\TransportException
286
     */
287 8
    public function writeAsyncCSV($sql, $file_name)
288
    {
289 8
        $query = new Query($sql);
290
291 8
        $url = $this->getUrl([
292 8
            'readonly' => 0,
293 8
            'query' => $query->toSql()
294
        ]);
295
296
        $extendinfo = [
297 8
            'sql' => $sql,
298 8
            'query' => $query,
299 8
            'format'=> $query->getFormat()
0 ignored issues
show
Array double arrow not aligned correctly; expected 1 space(s) but found 0
Loading history...
300
        ];
301
302 8
        $request = $this->newRequest($extendinfo);
303 8
        $request->url($url);
304
305
        $request->setCallbackFunction(function(CurlerRequest $request) {
0 ignored issues
show
Expected 1 space after FUNCTION keyword; 0 found
Loading history...
306 8
            $handle = $request->getInfileHandle();
307 8
            if (is_resource($handle)) {
308 8
                fclose($handle);
309
            }
310 8
        });
311
312 8
        $request->setInfile($file_name);
313 8
        $this->_curler->addQueLoop($request);
314
315 8
        return new Statement($request);
316
    }
317
318
    /**
319
     * get Count Pending Query in Queue
320
     *
321
     * @return int
322
     */
323 12
    public function getCountPendingQueue()
324
    {
325 12
        return $this->_curler->countPending();
326
    }
327
328
    /**
329
     * set Connect TimeOut in seconds [CURLOPT_CONNECTTIMEOUT] ( int )
330
     *
331
     * @param int $connectTimeOut
332
     */
333 2
    public function setConnectTimeOut($connectTimeOut)
334
    {
335 2
        $this->_connectTimeOut = $connectTimeOut;
336 2
    }
337
338
    /**
339
     * get ConnectTimeOut in seconds
340
     *
341
     * @return int
342
     */
343 37
    public function getConnectTimeOut()
344
    {
345 37
        return $this->_connectTimeOut;
346
    }
347
348
349 1
    public function __findXClickHouseProgress($handle)
350
    {
351 1
        $code = curl_getinfo($handle, CURLINFO_HTTP_CODE);
352
353
        // Search X-ClickHouse-Progress
354 1
        if ($code == 200) {
355 1
            $response = curl_multi_getcontent($handle);
356 1
            $header_size = curl_getinfo($handle, CURLINFO_HEADER_SIZE);
357 1
            if (!$header_size) {
358
                return false;
359
            }
360
361 1
            $header = substr($response, 0, $header_size);
362 1
            if (!$header_size) {
363
                return false;
364
            }
365 1
            $pos = strrpos($header, 'X-ClickHouse-Progress');
366
367 1
            if (!$pos) {
368
                return false;
369
            }
370
371 1
            $last = substr($header, $pos);
372 1
            $data = @json_decode(str_ireplace('X-ClickHouse-Progress:', '', $last), true);
373
374 1
            if ($data && is_callable($this->xClickHouseProgress)) {
375
376 1
                if (is_array($this->xClickHouseProgress)) {
377
                    call_user_func_array($this->xClickHouseProgress, [$data]);
378
                } else {
379 1
                    call_user_func($this->xClickHouseProgress, $data);
380
                }
381
382
383
            }
384
385
        }
386
387 1
    }
388
389
    /**
390
     * @param Query $query
391
     * @param null|WhereInFile $whereInFile
392
     * @param null|WriteToFile $writeToFile
393
     * @return CurlerRequest
394
     * @throws \Exception
395
     */
396 39
    public function getRequestRead(Query $query, $whereInFile = null, $writeToFile = null)
397
    {
398 39
        $urlParams = ['readonly' => 2];
399 39
        $query_as_string = false;
400
        // ---------------------------------------------------------------------------------
401 39
        if ($whereInFile instanceof WhereInFile && $whereInFile->size()) {
402
            // $request = $this->prepareSelectWhereIn($request, $whereInFile);
403 1
            $structure = $whereInFile->fetchUrlParams();
404
            // $structure = [];
405 1
            $urlParams = array_merge($urlParams, $structure);
406 1
            $query_as_string = true;
407
        }
408
        // ---------------------------------------------------------------------------------
409
        // if result to file
410 39
        if ($writeToFile instanceof WriteToFile && $writeToFile->fetchFormat()) {
411 1
            $query->setFormat($writeToFile->fetchFormat());
412 1
            unset($urlParams['extremes']);
413
        }
414
        // ---------------------------------------------------------------------------------
415
        // makeRequest read
416 39
        $request = $this->makeRequest($query, $urlParams, $query_as_string);
417
        // ---------------------------------------------------------------------------------
418
        // attach files
419 39
        if ($whereInFile instanceof WhereInFile && $whereInFile->size()) {
420 1
            $request->attachFiles($whereInFile->fetchFiles());
421
        }
422
        // ---------------------------------------------------------------------------------
423
        // result to file
424 39
        if ($writeToFile instanceof WriteToFile && $writeToFile->fetchFormat()) {
425
426 1
            $fout = fopen($writeToFile->fetchFile(), 'w');
427 1
            if (is_resource($fout)) {
428
429 1
                $isGz = $writeToFile->getGzip();
430
431 1
                if ($isGz) {
432
                    // write gzip header
433
                    // "\x1f\x8b\x08\x00\x00\x00\x00\x00"
434
                    // fwrite($fout, "\x1F\x8B\x08\x08".pack("V", time())."\0\xFF", 10);
435
                    // write the original file name
436
                    // $oname = str_replace("\0", "", basename($writeToFile->fetchFile()));
437
                    // fwrite($fout, $oname."\0", 1+strlen($oname));
438
439
                    fwrite($fout, "\x1f\x8b\x08\x00\x00\x00\x00\x00");
440
441
                }
442
443
444
                $request->setResultFileHandle($fout, $isGz)->setCallbackFunction(function(CurlerRequest $request) {
0 ignored issues
show
Expected 1 space after FUNCTION keyword; 0 found
Loading history...
445
                    fclose($request->getResultFileHandle());
446 1
                });
447
            }
448
        }
449 39
        if ($this->xClickHouseProgress)
450
        {
451 1
            $request->setFunctionProgress([$this, '__findXClickHouseProgress']);
452
        }
453
        // ---------------------------------------------------------------------------------
454 39
        return $request;
455
456
    }
457
458 1
    public function cleanQueryDegeneration()
459
    {
460 1
        $this->_query_degenerations = [];
461 1
        return true;
462
    }
463
464 67
    public function addQueryDegeneration(Degeneration $degeneration)
465
    {
466 67
        $this->_query_degenerations[] = $degeneration;
467 67
        return true;
468
    }
469
470
    /**
471
     * @param Query $query
472
     * @return CurlerRequest
473
     * @throws \ClickHouseDB\Exception\TransportException
474
     */
475 26
    public function getRequestWrite(Query $query)
476
    {
477 26
        $urlParams = ['readonly' => 0];
478 26
        return $this->makeRequest($query, $urlParams);
479
    }
480
481
    /**
482
     * @throws TransportException
483
     */
484 37
    public function ping() : bool
485
    {
486 37
        $request = new CurlerRequest();
487 37
        $request->url($this->getUri())->verbose(false)->GET()->connectTimeOut($this->getConnectTimeOut());
488 37
        $this->_curler->execOne($request);
489
490 37
        return $request->response()->body() === 'Ok.' . PHP_EOL;
491
    }
492
493
    /**
494
     * @param string $sql
495
     * @param mixed[] $bindings
496
     * @return Query
497
     */
498 45
    private function prepareQuery($sql, $bindings)
499
    {
500
501
        // add Degeneration query
502 45
        foreach ($this->_query_degenerations as $degeneration) {
503 45
            $degeneration->bindParams($bindings);
504
        }
505
506 45
        return new Query($sql, $this->_query_degenerations);
507
    }
508
509
510
    /**
511
     * @param Query|string $sql
512
     * @param mixed[] $bindings
513
     * @param null|WhereInFile $whereInFile
514
     * @param null|WriteToFile $writeToFile
515
     * @return CurlerRequest
516
     * @throws \Exception
517
     */
518 38
    private function prepareSelect($sql, $bindings, $whereInFile, $writeToFile = null)
519
    {
520 38
        if ($sql instanceof Query) {
521
            return $this->getRequestWrite($sql);
522
        }
523 38
        $query = $this->prepareQuery($sql, $bindings);
524 38
        $query->setFormat('JSON');
525 38
        return $this->getRequestRead($query, $whereInFile, $writeToFile);
526
    }
527
528
529
530
531
    /**
532
     * @param Query|string $sql
533
     * @param mixed[] $bindings
534
     * @return CurlerRequest
535
     * @throws \ClickHouseDB\Exception\TransportException
536
     */
537 27
    private function prepareWrite($sql, $bindings = [])
538
    {
539 27
        if ($sql instanceof Query) {
540
            return $this->getRequestWrite($sql);
541
        }
542
543 27
        $query = $this->prepareQuery($sql, $bindings);
544 26
        return $this->getRequestWrite($query);
545
    }
546
547
    /**
548
     * @return bool
549
     * @throws \ClickHouseDB\Exception\TransportException
550
     */
551 10
    public function executeAsync()
552
    {
553 10
        return $this->_curler->execLoopWait();
554
    }
555
556
    /**
557
     * @param Query|string $sql
558
     * @param mixed[] $bindings
559
     * @param null|WhereInFile $whereInFile
560
     * @param null|WriteToFile $writeToFile
561
     * @return Statement
562
     * @throws \ClickHouseDB\Exception\TransportException
563
     * @throws \Exception
564
     */
565 31
    public function select($sql, array $bindings = [], $whereInFile = null, $writeToFile = null)
566
    {
567 31
        $request = $this->prepareSelect($sql, $bindings, $whereInFile, $writeToFile);
568 31
        $this->_curler->execOne($request);
569 31
        return new Statement($request);
570
    }
571
572
    /**
573
     * @param Query|string $sql
574
     * @param mixed[] $bindings
575
     * @param null|WhereInFile $whereInFile
576
     * @param null|WriteToFile $writeToFile
577
     * @return Statement
578
     * @throws \ClickHouseDB\Exception\TransportException
579
     * @throws \Exception
580
     */
581 7
    public function selectAsync($sql, array $bindings = [], $whereInFile = null, $writeToFile = null)
582
    {
583 7
        $request = $this->prepareSelect($sql, $bindings, $whereInFile, $writeToFile);
584 7
        $this->_curler->addQueLoop($request);
585 7
        return new Statement($request);
586
    }
587
588
    /**
589
     * @param callable $callback
590
     */
591 1
    public function setProgressFunction(callable $callback)
592
    {
593 1
        $this->xClickHouseProgress = $callback;
594 1
    }
595
596
    /**
597
     * @param string $sql
598
     * @param mixed[] $bindings
599
     * @param bool $exception
600
     * @return Statement
601
     * @throws \ClickHouseDB\Exception\TransportException
602
     */
603 27
    public function write($sql, array $bindings = [], $exception = true)
604
    {
605 27
        $request = $this->prepareWrite($sql, $bindings);
606 26
        $this->_curler->execOne($request);
607 26
        $response = new Statement($request);
608 26
        if ($exception) {
609 26
            if ($response->isError()) {
610 3
                $response->error();
611
            }
612
        }
613 24
        return $response;
614
    }
615
616
    /**
617
     * @param Stream $streamRW
618
     * @param CurlerRequest $request
619
     * @return Statement
620
     * @throws \ClickHouseDB\Exception\TransportException
621
     */
622 2
    private function streaming(Stream $streamRW,CurlerRequest $request)
0 ignored issues
show
Expected 1 space between comma and type hint "CurlerRequest"; 0 found
Loading history...
623
    {
624 2
        $callable=$streamRW->getClosure();
0 ignored issues
show
Equals sign not aligned with surrounding assignments; expected 1 space but found 0 spaces

This check looks for multiple assignments in successive lines of code. It will report an issue if the operators are not in a straight line.

To visualize

$a = "a";
$ab = "ab";
$abc = "abc";

will produce issues in the first and second line, while this second example

$a   = "a";
$ab  = "ab";
$abc = "abc";

will produce no issues.

Loading history...
625 2
        $stream=$streamRW->getStream();
0 ignored issues
show
Equals sign not aligned with surrounding assignments; expected 3 spaces but found 0 spaces

This check looks for multiple assignments in successive lines of code. It will report an issue if the operators are not in a straight line.

To visualize

$a = "a";
$ab = "ab";
$abc = "abc";

will produce issues in the first and second line, while this second example

$a   = "a";
$ab  = "ab";
$abc = "abc";

will produce no issues.

Loading history...
626
0 ignored issues
show
Functions must not contain multiple empty lines in a row; found 3 empty lines
Loading history...
627
628
629
        try {
630
631
632 2
            if (!is_callable($callable)) {
633
                if ($streamRW->isWrite())
634
                {
0 ignored issues
show
Blank line found at start of control structure
Loading history...
635
636
                    $callable = function ($ch, $fd, $length) use ($stream) {
637
                        return ($line = fread($stream, $length)) ? $line : '';
638
                    };
639
                } else {
640
                    $callable = function ($ch, $fd) use ($stream) {
641
                        return fwrite($stream, $fd);
642
                    };
643
                }
644
            }
645
646 2
            if ($streamRW->isGzipHeader()) {
647
648 1
                if ($streamRW->isWrite())
649
                {
650 1
                    $request->header('Content-Encoding', 'gzip');
651 1
                    $request->header('Content-Type', 'application/x-www-form-urlencoded');
652
                } else {
653
                    $request->header('Accept-Encoding', 'gzip');
654
                }
655
656
            }
657
0 ignored issues
show
Functions must not contain multiple empty lines in a row; found 3 empty lines
Loading history...
658
659
660 2
            $request->header('Transfer-Encoding', 'chunked');
661
662
663 2
            if ($streamRW->isWrite())
664
            {
665 1
                $request->setReadFunction($callable);
666
            } else {
667 1
                $request->setWriteFunction($callable);
668
0 ignored issues
show
Functions must not contain multiple empty lines in a row; found 3 empty lines
Loading history...
669
670
671
//                $request->setHeaderFunction($callableHead);
672
            }
673
674
675 2
            $this->_curler->execOne($request,true);
676 2
            $response = new Statement($request);
677 2
            if ($response->isError()) {
678
                $response->error();
679
            }
680 2
            return $response;
681
        } finally {
682 2
            if ($streamRW->isWrite())
683 2
            fclose($stream);
684
        }
685
686
687
    }
688
689
690
    /**
691
     * @param Stream $streamRead
692
     * @param string $sql
693
     * @param mixed[] $bindings
694
     * @return Statement
695
     * @throws \ClickHouseDB\Exception\TransportException
696
     */
697 1
    public function streamRead(Stream $streamRead,$sql,$bindings=[])
698
    {
699 1
        $sql=$this->prepareQuery($sql,$bindings);
700 1
        $request=$this->getRequestRead($sql);
701 1
        return $this->streaming($streamRead,$request);
702
703
    }
704
705
    /**
706
     * @param Stream $streamWrite
707
     * @param string $sql
708
     * @param mixed[] $bindings
709
     * @return Statement
710
     * @throws \ClickHouseDB\Exception\TransportException
711
     */
712 1
    public function streamWrite(Stream $streamWrite,$sql,$bindings=[])
713
    {
714 1
        $sql=$this->prepareQuery($sql,$bindings);
715 1
        $request = $this->writeStreamData($sql);
716 1
        return $this->streaming($streamWrite,$request);
717
    }
718
}
719