Failed Conditions
Pull Request — master (#76)
by
unknown
02:10
created

src/Transport/Http.php (6 issues)

1
<?php
2
3
namespace ClickHouseDB\Transport;
4
5
use ClickHouseDB\Exception\TransportException;
6
use ClickHouseDB\Query\Degeneration;
7
use ClickHouseDB\Query\Query;
8
use ClickHouseDB\Query\WhereInFile;
9
use ClickHouseDB\Query\WriteToFile;
10
use ClickHouseDB\Settings;
11
use ClickHouseDB\Statement;
12
use const PHP_EOL;
13
14
class Http
15
{
16
    /**
0 ignored issues
show
Found multi-line comment for property \ClickHouseDB\Transport\Http::$_username with single line content, use one-line comment instead.
Loading history...
17
     * @var string
18
     */
19
    private $_username = null;
20
21
    /**
22
     * @var string
23
     */
24
    private $_password = null;
25
26
    /**
27
     * @var string
28
     */
29
    private $_host = '';
30
31
    /**
32
     * @var int
33
     */
34
    private $_port = 0;
35
36
    /**
37
     * @var bool|int
38
     */
39
    private $_verbose = false;
40
41
    /**
42
     * @var CurlerRolling
43
     */
44
    private $_curler = null;
45
46
    /**
47
     * @var Settings
48
     */
49
    private $_settings = null;
50
51
    /**
52
     * @var array
53
     */
54
    private $_query_degenerations = [];
55
56
    /**
57
     * Count seconds (int)
58
     *
59
     * @var int
60
     */
61
    private $_connectTimeOut = 5;
62
63
    /**
64
     * @var callable
65
     */
66
    private $xClickHouseProgress = null;
67
68
    /**
69
     * Http constructor.
70
     * @param string $host
71
     * @param int $port
72
     * @param string $username
73
     * @param string $password
74
     */
75 53
    public function __construct($host, $port, $username, $password)
76
    {
77 53
        $this->setHost($host, $port);
78
79 53
        $this->_username = $username;
80 53
        $this->_password = $password;
81 53
        $this->_settings = new Settings($this);
82
83 53
        $this->setCurler();
84 53
    }
85
86
87 53
    public function setCurler()
88
    {
89 53
        $this->_curler = new CurlerRolling();
90 53
    }
91
92
    /**
93
     * @return CurlerRolling
94
     */
95
    public function getCurler()
96
    {
97
        return $this->_curler;
98
    }
99
100
    /**
101
     * @param string $host
102
     * @param int $port
103
     */
104 53
    public function setHost($host, $port = -1)
105
    {
106 53
        if ($port > 0) {
107 53
            $this->_port = $port;
108
        }
109
110 53
        $this->_host = $host;
111 53
    }
112
113
    /**
114
     * @return string
115
     */
116 43
    public function getUri()
117
    {
118 43
        $proto = 'http';
119 43
        if ($this->settings()->isHttps()) {
120
            $proto = 'https';
121
        }
122
123 43
        return $proto . '://' . $this->_host . ':' . $this->_port;
124
    }
125
126
    /**
127
     * @return Settings
128
     */
129 53
    public function settings()
130
    {
131 53
        return $this->_settings;
132
    }
133
134
    /**
135
     * @param bool|int $flag
136
     * @return mixed
137
     */
138
    public function verbose($flag)
139
    {
140
        $this->_verbose = $flag;
141
        return $flag;
142
    }
143
144
    /**
145
     * @param array $params
146
     * @return string
147
     */
148 35
    private function getUrl($params = [])
149
    {
150 35
        $settings = $this->settings()->getSettings();
151
152 35
        if (is_array($params) && sizeof($params)) {
153 35
            $settings = array_merge($settings, $params);
154
        }
155
156
157 35
        if ($this->settings()->isReadOnlyUser())
158
        {
159
            unset($settings['extremes']);
160
            unset($settings['readonly']);
161
            unset($settings['enable_http_compression']);
162
            unset($settings['max_execution_time']);
163
164
        }
165
166 35
        unset($settings['https']);
167
168
169 35
        return $this->getUri() . '?' . http_build_query($settings);
170
    }
171
172
    /**
173
     * @param array $extendinfo
174
     * @return CurlerRequest
175
     */
176 35
    private function newRequest($extendinfo)
177
    {
178 35
        $new = new CurlerRequest();
179 35
        $new->auth($this->_username, $this->_password)
180 35
            ->POST()
181 35
            ->setRequestExtendedInfo($extendinfo);
182
183 35
        if ($this->settings()->isEnableHttpCompression()) {
184 35
            $new->httpCompression(true);
185
        }
186 35
        if ($this->settings()->getSessionId())
187
        {
188 1
            $new->persistent();
189
        }
190
191 35
        $new->timeOut($this->settings()->getTimeOut());
192 35
        $new->connectTimeOut($this->_connectTimeOut)->keepAlive(); // one sec
193 35
        $new->verbose(boolval($this->_verbose));
194
195 35
        return $new;
196
    }
197
198
    /**
199
     * @param Query $query
200
     * @param array $urlParams
201
     * @param bool $query_as_string
202
     * @return CurlerRequest
203
     * @throws \ClickHouseDB\Exception\TransportException
204
     */
205 35
    private function makeRequest(Query $query, $urlParams = [], $query_as_string = false)
206
    {
207 35
        $sql = $query->toSql();
208
209 35
        if ($query_as_string) {
210 1
            $urlParams['query'] = $sql;
211
        }
212
213 35
        $url = $this->getUrl($urlParams);
214
215
        $extendinfo = [
216 35
            'sql' => $sql,
217 35
            'query' => $query,
218 35
            'format'=> $query->getFormat()
219
        ];
220
221 35
        $new = $this->newRequest($extendinfo);
222 35
        $new->url($url);
223
224
225
226
227 35
        if (!$query_as_string) {
228 35
            $new->parameters_json($sql);
229
        }
230 35
        if ($this->settings()->isEnableHttpCompression()) {
231 35
            $new->httpCompression(true);
232
        }
233
234 35
        return $new;
235
    }
236
237
    /**
238
     * @param string|Query $sql
239
     * @return CurlerRequest
240
     */
241 3
    public function writeStreamData($sql)
242
    {
243
244 3
        if ($sql instanceof Query) {
245 1
            $query=$sql;
246
        } else {
247 2
            $query = new Query($sql);
248
        }
249
250 3
        $url = $this->getUrl([
251 3
            'readonly' => 0,
252 3
            'query' => $query->toSql()
253
        ]);
254
        $extendinfo = [
255 3
            'sql' => $sql,
256 3
            'query' => $query,
257 3
            'format'=> $query->getFormat()
258
        ];
259
260 3
        $request = $this->newRequest($extendinfo);
261 3
        $request->url($url);
262 3
        return $request;
263
    }
264
265
266
    /**
267
     * @param string $sql
268
     * @param string $file_name
269
     * @return Statement
270
     * @throws \ClickHouseDB\Exception\TransportException
271
     */
272 7
    public function writeAsyncCSV($sql, $file_name)
273
    {
274 7
        $query = new Query($sql);
275
276 7
        $url = $this->getUrl([
277 7
            'readonly' => 0,
278 7
            'query' => $query->toSql()
279
        ]);
280
281
        $extendinfo = [
282 7
            'sql' => $sql,
283 7
            'query' => $query,
284 7
            'format'=> $query->getFormat()
285
        ];
286
287 7
        $request = $this->newRequest($extendinfo);
288 7
        $request->url($url);
289
290
        $request->setCallbackFunction(function(CurlerRequest $request) {
291 7
            $handle = $request->getInfileHandle();
292 7
            if (is_resource($handle)) {
293 7
                fclose($handle);
294
            }
295 7
        });
296
297 7
        $request->setInfile($file_name);
298 7
        $this->_curler->addQueLoop($request);
299
300 7
        return new Statement($request);
301
    }
302
303
    /**
304
     * get Count Pending Query in Queue
305
     *
306
     * @return int
307
     */
308 11
    public function getCountPendingQueue()
309
    {
310 11
        return $this->_curler->countPending();
311
    }
312
313
    /**
314
     * set Connect TimeOut in seconds [CURLOPT_CONNECTTIMEOUT] ( int )
315
     *
316
     * @param int $connectTimeOut
317
     */
318 2
    public function setConnectTimeOut($connectTimeOut)
319
    {
320 2
        $this->_connectTimeOut = $connectTimeOut;
321 2
    }
322
323
    /**
324
     * get ConnectTimeOut in seconds
325
     *
326
     * @return int
327
     */
328 36
    public function getConnectTimeOut()
329
    {
330 36
        return $this->_connectTimeOut;
331
    }
332
333
334 1
    public function __findXClickHouseProgress($handle)
335
    {
336 1
        $code = curl_getinfo($handle, CURLINFO_HTTP_CODE);
337
338
        // Search X-ClickHouse-Progress
339 1
        if ($code == 200) {
340 1
            $response = curl_multi_getcontent($handle);
341 1
            $header_size = curl_getinfo($handle, CURLINFO_HEADER_SIZE);
342 1
            if (!$header_size) {
343
                return false;
344
            }
345
346 1
            $header = substr($response, 0, $header_size);
347 1
            if (!$header_size) {
348
                return false;
349
            }
350 1
            $pos = strrpos($header, 'X-ClickHouse-Progress');
351
352 1
            if (!$pos) {
353
                return false;
354
            }
355
356 1
            $last = substr($header, $pos);
357 1
            $data = @json_decode(str_ireplace('X-ClickHouse-Progress:', '', $last), true);
358
359 1
            if ($data && is_callable($this->xClickHouseProgress)) {
360
361 1
                if (is_array($this->xClickHouseProgress)) {
362
                    call_user_func_array($this->xClickHouseProgress, [$data]);
363
                } else {
364 1
                    call_user_func($this->xClickHouseProgress, $data);
365
                }
366
367
368
            }
369
370
        }
371
372 1
    }
373
374
    /**
375
     * @param Query $query
376
     * @param null|WhereInFile $whereInFile
377
     * @param null|WriteToFile $writeToFile
378
     * @return CurlerRequest
379
     * @throws \Exception
380
     */
381 30
    public function getRequestRead(Query $query, $whereInFile = null, $writeToFile = null)
382
    {
383 30
        $urlParams = ['readonly' => 1];
384 30
        $query_as_string = false;
385
        // ---------------------------------------------------------------------------------
386 30
        if ($whereInFile instanceof WhereInFile && $whereInFile->size()) {
387
            // $request = $this->prepareSelectWhereIn($request, $whereInFile);
388 1
            $structure = $whereInFile->fetchUrlParams();
389
            // $structure = [];
390 1
            $urlParams = array_merge($urlParams, $structure);
391 1
            $query_as_string = true;
392
        }
393
        // ---------------------------------------------------------------------------------
394
        // if result to file
395 30
        if ($writeToFile instanceof WriteToFile && $writeToFile->fetchFormat()) {
396 1
            $query->setFormat($writeToFile->fetchFormat());
397 1
            unset($urlParams['extremes']);
398
        }
399
        // ---------------------------------------------------------------------------------
400
        // makeRequest read
401 30
        $request = $this->makeRequest($query, $urlParams, $query_as_string);
402
        // ---------------------------------------------------------------------------------
403
        // attach files
404 30
        if ($whereInFile instanceof WhereInFile && $whereInFile->size()) {
405 1
            $request->attachFiles($whereInFile->fetchFiles());
406
        }
407
        // ---------------------------------------------------------------------------------
408
        // result to file
409 30
        if ($writeToFile instanceof WriteToFile && $writeToFile->fetchFormat()) {
410
411 1
            $fout = fopen($writeToFile->fetchFile(), 'w');
412 1
            if (is_resource($fout)) {
413
414 1
                $isGz = $writeToFile->getGzip();
415
416 1
                if ($isGz) {
417
                    // write gzip header
418
                    // "\x1f\x8b\x08\x00\x00\x00\x00\x00"
419
                    // fwrite($fout, "\x1F\x8B\x08\x08".pack("V", time())."\0\xFF", 10);
420
                    // write the original file name
421
                    // $oname = str_replace("\0", "", basename($writeToFile->fetchFile()));
422
                    // fwrite($fout, $oname."\0", 1+strlen($oname));
423
424
                    fwrite($fout, "\x1f\x8b\x08\x00\x00\x00\x00\x00");
425
426
                }
427
428
429
                $request->setResultFileHandle($fout, $isGz)->setCallbackFunction(function(CurlerRequest $request) {
430
                    fclose($request->getResultFileHandle());
431 1
                });
432
            }
433
        }
434 30
        if ($this->xClickHouseProgress)
435
        {
436 1
            $request->setFunctionProgress([$this, '__findXClickHouseProgress']);
437
        }
438
        // ---------------------------------------------------------------------------------
439 30
        return $request;
440
441
    }
442
443 1
    public function cleanQueryDegeneration()
444
    {
445 1
        $this->_query_degenerations = [];
446 1
        return true;
447
    }
448
449 53
    public function addQueryDegeneration(Degeneration $degeneration)
450
    {
451 53
        $this->_query_degenerations[] = $degeneration;
452 53
        return true;
453
    }
454
455
    /**
456
     * @param Query $query
457
     * @return CurlerRequest
458
     * @throws \ClickHouseDB\Exception\TransportException
459
     */
460 19
    public function getRequestWrite(Query $query)
461
    {
462 19
        $urlParams = ['readonly' => 0];
463 19
        return $this->makeRequest($query, $urlParams);
464
    }
465
466
    /**
467
     * @return bool
0 ignored issues
show
Method \ClickHouseDB\Transport\Http::ping() has useless @return annotation.
Loading history...
Expected "boolean" but found "bool" for function return type
Loading history...
468
     * @throws TransportException
0 ignored issues
show
Comment missing for @throws tag in function comment
Loading history...
469
     */
470 36
    public function ping() : bool
471
    {
472 36
        $request = new CurlerRequest();
473 36
        $request->url($this->getUri())->verbose(false)->GET()->connectTimeOut($this->getConnectTimeOut());
474 36
        $this->_curler->execOne($request);
475
476 36
        return $request->response()->body() === 'Ok.' . PHP_EOL;
477
    }
478
479
    /**
480
     * @param string $sql
481
     * @param array $bindings
482
     * @return Query
483
     */
484 36
    private function prepareQuery($sql, $bindings)
485
    {
486
487
        // add Degeneration query
488 36
        foreach ($this->_query_degenerations as $degeneration) {
489 36
            $degeneration->bindParams($bindings);
490
        }
491
492 36
        return new Query($sql, $this->_query_degenerations);
493
    }
494
495
496
    /**
497
     * @param Query|string $sql
498
     * @param array $bindings
499
     * @param null|WhereInFile $whereInFile
500
     * @param null|WriteToFile $writeToFile
501
     * @return CurlerRequest
502
     * @throws \Exception
503
     */
504 29
    private function prepareSelect($sql, $bindings, $whereInFile, $writeToFile = null)
505
    {
506 29
        if ($sql instanceof Query) {
507
            return $this->getRequestWrite($sql);
508
        }
509 29
        $query = $this->prepareQuery($sql, $bindings);
510 29
        $query->setFormat('JSON');
511 29
        return $this->getRequestRead($query, $whereInFile, $writeToFile);
512
    }
513
514
515
516
517
    /**
518
     * @param Query|string $sql
519
     * @param array $bindings
520
     * @return CurlerRequest
521
     * @throws \ClickHouseDB\Exception\TransportException
522
     */
523 20
    private function prepareWrite($sql, $bindings = [])
524
    {
525 20
        if ($sql instanceof Query) {
526
            return $this->getRequestWrite($sql);
527
        }
528
529 20
        $query = $this->prepareQuery($sql, $bindings);
530 19
        return $this->getRequestWrite($query);
531
    }
532
533
    /**
534
     * @return bool
535
     * @throws \ClickHouseDB\Exception\TransportException
536
     */
537 9
    public function executeAsync()
538
    {
539 9
        return $this->_curler->execLoopWait();
540
    }
541
542
    /**
543
     * @param Query|string $sql
544
     * @param array $bindings
545
     * @param null|WhereInFile $whereInFile
546
     * @param null|WriteToFile $writeToFile
547
     * @return Statement
548
     * @throws \ClickHouseDB\Exception\TransportException
549
     * @throws \Exception
550
     */
551 24
    public function select($sql, array $bindings = [], $whereInFile = null, $writeToFile = null)
552
    {
553 24
        $request = $this->prepareSelect($sql, $bindings, $whereInFile, $writeToFile);
554 24
        $this->_curler->execOne($request);
555 24
        return new Statement($request);
556
    }
557
558
    /**
559
     * @param Query|string $sql
560
     * @param array $bindings
561
     * @param null|WhereInFile $whereInFile
562
     * @param null|WriteToFile $writeToFile
563
     * @return Statement
564
     * @throws \ClickHouseDB\Exception\TransportException
565
     * @throws \Exception
566
     */
567 5
    public function selectAsync($sql, array $bindings = [], $whereInFile = null, $writeToFile = null)
568
    {
569 5
        $request = $this->prepareSelect($sql, $bindings, $whereInFile, $writeToFile);
570 5
        $this->_curler->addQueLoop($request);
571 5
        return new Statement($request);
572
    }
573
574
    /**
575
     * @param callable $callback
576
     */
577 1
    public function setProgressFunction(callable $callback)
578
    {
579 1
        $this->xClickHouseProgress = $callback;
580 1
    }
581
582
    /**
583
     * @param string $sql
584
     * @param array $bindings
585
     * @param bool $exception
586
     * @return Statement
587
     * @throws \ClickHouseDB\Exception\TransportException
0 ignored issues
show
Class \ClickHouseDB\Exception\TransportException should not be referenced via a fully qualified name, but via a use statement.
Loading history...
Comment missing for @throws tag in function comment
Loading history...
588
     */
589 20
    public function write($sql, array $bindings = [], $exception = true)
590
    {
591 20
        $request = $this->prepareWrite($sql, $bindings);
592 19
        $this->_curler->execOne($request);
593 19
        $response = new Statement($request);
594 19
        if ($exception) {
595 19
            if ($response->isError()) {
596 3
                $response->error();
597
            }
598
        }
599 17
        return $response;
600
    }
601
602
    /**
603
     * @param Stream $streamRW
604
     * @param CurlerRequest $request
605
     * @return Statement
606
     * @throws \ClickHouseDB\Exception\TransportException
607
     */
608 2
    private function streaming(Stream $streamRW,CurlerRequest $request)
609
    {
610 2
        $callable=$streamRW->getClosure();
611 2
        $stream=$streamRW->getStream();
612
613
614
615
        try {
616
617
618 2
            if (!is_callable($callable)) {
619
                if ($streamRW->isWrite())
620
                {
621
622
                    $callable = function ($ch, $fd, $length) use ($stream) {
623
                        return ($line = fread($stream, $length)) ? $line : '';
624
                    };
625
                } else {
626
                    $callable = function ($ch, $fd) use ($stream) {
627
                        return fwrite($stream, $fd);
628
                    };
629
                }
630
            }
631
632 2
            if ($streamRW->isGzipHeader()) {
633
634 1
                if ($streamRW->isWrite())
635
                {
636 1
                    $request->header('Content-Encoding', 'gzip');
637 1
                    $request->header('Content-Type', 'application/x-www-form-urlencoded');
638
                } else {
639
                    $request->header('Accept-Encoding', 'gzip');
640
                }
641
642
            }
643
644
645
646 2
            $request->header('Transfer-Encoding', 'chunked');
647
648
649 2
            if ($streamRW->isWrite())
650
            {
651 1
                $request->setReadFunction($callable);
652
            } else {
653 1
                $request->setWriteFunction($callable);
654
655
656
657
//                $request->setHeaderFunction($callableHead);
658
            }
659
660
661 2
            $this->_curler->execOne($request,true);
662 2
            $response = new Statement($request);
663 2
            if ($response->isError()) {
664
                $response->error();
665
            }
666 2
            return $response;
667
        } finally {
668 2
            if ($streamRW->isWrite())
669 2
            fclose($stream);
670
        }
671
672
673
    }
674
675
676
    /**
677
     * @param Stream $streamRead
678
     * @param string $sql
679
     * @param array $bindings
680
     * @return Statement
681
     * @throws \ClickHouseDB\Exception\TransportException
682
     */
683 1
    public function streamRead(Stream $streamRead,$sql,$bindings=[])
684
    {
685 1
        $sql=$this->prepareQuery($sql,$bindings);
686 1
        $request=$this->getRequestRead($sql);
687 1
        return $this->streaming($streamRead,$request);
688
689
    }
690
691
    /**
692
     * @param Stream $streamWrite
693
     * @param string $sql
694
     * @param array $bindings
695
     * @return Statement
696
     * @throws \ClickHouseDB\Exception\TransportException
697
     */
698 1
    public function streamWrite(Stream $streamWrite,$sql,$bindings=[])
699
    {
700 1
        $sql=$this->prepareQuery($sql,$bindings);
701 1
        $request = $this->writeStreamData($sql);
702 1
        return $this->streaming($streamWrite,$request);
703
    }
704
}
705