Passed
Push — master ( 7b4f81...84e0ca )
by Igor
03:22
created

Http::__findXClickHouseProgress()   B

Complexity

Conditions 8
Paths 7

Size

Total Lines 31

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 15
CRAP Score 8.5969

Importance

Changes 0
Metric Value
dl 0
loc 31
ccs 15
cts 19
cp 0.7895
rs 8.1795
c 0
b 0
f 0
cc 8
nc 7
nop 1
crap 8.5969
1
<?php
2
3
namespace ClickHouseDB\Transport;
4
5
use ClickHouseDB\Query\Degeneration;
6
use ClickHouseDB\Query\Query;
7
use ClickHouseDB\Query\WhereInFile;
8
use ClickHouseDB\Query\WriteToFile;
9
use ClickHouseDB\Settings;
10
use ClickHouseDB\Statement;
11
12
class Http
13
{
14
    /**
15
     * @var string
16
     */
17
    private $_username = null;
18
19
    /**
20
     * @var string
21
     */
22
    private $_password = null;
23
24
    /**
25
     * @var string
26
     */
27
    private $_host = '';
28
29
    /**
30
     * @var int
31
     */
32
    private $_port = 0;
33
34
    /**
35
     * @var bool|int
36
     */
37
    private $_verbose = false;
38
39
    /**
40
     * @var CurlerRolling
41
     */
42
    private $_curler = null;
43
44
    /**
45
     * @var Settings
46
     */
47
    private $_settings = null;
48
49
    /**
50
     * @var array
51
     */
52
    private $_query_degenerations = [];
53
54
    /**
55
     * Count seconds (int)
56
     *
57
     * @var int
58
     */
59
    private $_connectTimeOut = 5;
60
61
    /**
62
     * @var callable
63
     */
64
    private $xClickHouseProgress = null;
65
66
    /**
67
     * Http constructor.
68
     * @param string $host
69
     * @param int $port
70
     * @param string $username
71
     * @param string $password
72
     */
73 51
    public function __construct($host, $port, $username, $password)
74
    {
75 51
        $this->setHost($host, $port);
76
77 51
        $this->_username = $username;
78 51
        $this->_password = $password;
79 51
        $this->_settings = new Settings($this);
80
81 51
        $this->setCurler();
82 51
    }
83
84
85 51
    public function setCurler()
86
    {
87 51
        $this->_curler = new CurlerRolling();
88 51
    }
89
90
    /**
91
     * @return CurlerRolling
92
     */
93
    public function getCurler()
94
    {
95
        return $this->_curler;
96
    }
97
98
    /**
99
     * @param string $host
100
     * @param int $port
101
     */
102 51
    public function setHost($host, $port = -1)
103
    {
104 51
        if ($port > 0) {
105 51
            $this->_port = $port;
106
        }
107
108 51
        $this->_host = $host;
109 51
    }
110
111
    /**
112
     * @return string
113
     */
114 41
    public function getUri()
115
    {
116 41
        $proto = 'http';
117 41
        if ($this->settings()->isHttps()) {
118
            $proto = 'https';
119
        }
120
121 41
        return $proto . '://' . $this->_host . ':' . $this->_port;
122
    }
123
124
    /**
125
     * @return Settings
126
     */
127 51
    public function settings()
128
    {
129 51
        return $this->_settings;
130
    }
131
132
    /**
133
     * @param bool|int $flag
134
     * @return mixed
135
     */
136
    public function verbose($flag)
137
    {
138
        $this->_verbose = $flag;
139
        return $flag;
140
    }
141
142
    /**
143
     * @param array $params
144
     * @return string
145
     */
146 41
    private function getUrl($params = [])
147
    {
148 41
        $settings = $this->settings()->getSettings();
149
150 41
        if (is_array($params) && sizeof($params)) {
151 41
            $settings = array_merge($settings, $params);
152
        }
153
154
155 41
        if ($this->settings()->isReadOnlyUser())
156
        {
157
            unset($settings['extremes']);
158
            unset($settings['readonly']);
159
            unset($settings['enable_http_compression']);
160
            unset($settings['max_execution_time']);
161
162
        }
163
164 41
        unset($settings['https']);
165
166
167 41
        return $this->getUri() . '?' . http_build_query($settings);
168
    }
169
170
    /**
171
     * @param array $extendinfo
172
     * @return CurlerRequest
173
     */
174 41
    private function newRequest($extendinfo)
175
    {
176 41
        $new = new CurlerRequest();
177 41
        $new->auth($this->_username, $this->_password)
178 41
            ->POST()
179 41
            ->setRequestExtendedInfo($extendinfo);
180
181 41
        if ($this->settings()->isEnableHttpCompression()) {
182 41
            $new->httpCompression(true);
183
        }
184 41
        if ($this->settings()->getSessionId())
185
        {
186 1
            $new->persistent();
187
        }
188
189 41
        $new->timeOut($this->settings()->getTimeOut());
190 41
        $new->connectTimeOut($this->_connectTimeOut)->keepAlive(); // one sec
191 41
        $new->verbose(boolval($this->_verbose));
192
193 41
        return $new;
194
    }
195
196
    /**
197
     * @param Query $query
198
     * @param array $urlParams
199
     * @param bool $query_as_string
200
     * @return CurlerRequest
201
     * @throws \ClickHouseDB\Exception\TransportException
202
     */
203 41
    private function makeRequest(Query $query, $urlParams = [], $query_as_string = false)
204
    {
205 41
        $sql = $query->toSql();
206
207 41
        if ($query_as_string) {
208 1
            $urlParams['query'] = $sql;
209
        }
210
211 41
        $url = $this->getUrl($urlParams);
212
213
        $extendinfo = [
214 41
            'sql' => $sql,
215 41
            'query' => $query,
216 41
            'format'=> $query->getFormat()
217
        ];
218
219 41
        $new = $this->newRequest($extendinfo);
220 41
        $new->url($url);
221
222
223
224
225 41
        if (!$query_as_string) {
226 41
            $new->parameters_json($sql);
227
        }
228 41
        if ($this->settings()->isEnableHttpCompression()) {
229 41
            $new->httpCompression(true);
230
        }
231
232 41
        return $new;
233
    }
234
235
    /**
236
     * @param string|Query $sql
237
     * @return CurlerRequest
238
     */
239 3
    public function writeStreamData($sql)
240
    {
241
242 3
        if ($sql instanceof Query) {
243 1
            $query=$sql;
244
        } else {
245 2
            $query = new Query($sql);
246
        }
247
248 3
        $url = $this->getUrl([
249 3
            'readonly' => 0,
250 3
            'query' => $query->toSql()
251
        ]);
252
        $extendinfo = [
253 3
            'sql' => $sql,
254 3
            'query' => $query,
255 3
            'format'=> $query->getFormat()
256
        ];
257
258 3
        $request = $this->newRequest($extendinfo);
259 3
        $request->url($url);
260 3
        return $request;
261
    }
262
263
264
    /**
265
     * @param string $sql
266
     * @param string $file_name
267
     * @return Statement
268
     * @throws \ClickHouseDB\Exception\TransportException
269
     */
270 7
    public function writeAsyncCSV($sql, $file_name)
271
    {
272 7
        $query = new Query($sql);
273
274 7
        $url = $this->getUrl([
275 7
            'readonly' => 0,
276 7
            'query' => $query->toSql()
277
        ]);
278
279
        $extendinfo = [
280 7
            'sql' => $sql,
281 7
            'query' => $query,
282 7
            'format'=> $query->getFormat()
283
        ];
284
285 7
        $request = $this->newRequest($extendinfo);
286 7
        $request->url($url);
287
288
        $request->setCallbackFunction(function(CurlerRequest $request) {
289 7
            $handle = $request->getInfileHandle();
290 7
            if (is_resource($handle)) {
291 7
                fclose($handle);
292
            }
293 7
        });
294
295 7
        $request->setInfile($file_name);
296 7
        $this->_curler->addQueLoop($request);
297
298 7
        return new Statement($request);
299
    }
300
301
    /**
302
     * get Count Pending Query in Queue
303
     *
304
     * @return int
305
     */
306 11
    public function getCountPendingQueue()
307
    {
308 11
        return $this->_curler->countPending();
309
    }
310
311
    /**
312
     * set Connect TimeOut in seconds [CURLOPT_CONNECTTIMEOUT] ( int )
313
     *
314
     * @param int $connectTimeOut
315
     */
316 2
    public function setConnectTimeOut($connectTimeOut)
317
    {
318 2
        $this->_connectTimeOut = $connectTimeOut;
319 2
    }
320
321
    /**
322
     * get ConnectTimeOut in seconds
323
     *
324
     * @return int
325
     */
326 1
    public function getConnectTimeOut()
327
    {
328 1
        return $this->_connectTimeOut;
329
    }
330
331
332 1
    public function __findXClickHouseProgress($handle)
333
    {
334 1
        $code = curl_getinfo($handle, CURLINFO_HTTP_CODE);
335
336
        // Search X-ClickHouse-Progress
337 1
        if ($code == 200) {
338 1
            $response = curl_multi_getcontent($handle);
339 1
            $header_size = curl_getinfo($handle, CURLINFO_HEADER_SIZE);
340 1
            if (!$header_size) {
341
                return false;
342
            }
343
344 1
            $header = substr($response, 0, $header_size);
345 1
            if (!$header_size) {
346
                return false;
347
            }
348 1
            $pos = strrpos($header, 'X-ClickHouse-Progress');
349
350 1
            if (!$pos) {
351
                return false;
352
            }
353
354 1
            $last = substr($header, $pos);
355 1
            $data = @json_decode(str_ireplace('X-ClickHouse-Progress:', '', $last), true);
356
357 1
            if ($data && is_callable($this->xClickHouseProgress)) {
358
359 1
                if (is_array($this->xClickHouseProgress)) {
360
                    call_user_func_array($this->xClickHouseProgress, [$data]);
361
                } else {
362 1
                    call_user_func($this->xClickHouseProgress, $data);
363
                }
364
365
366
            }
367
368
        }
369
370 1
    }
371
372
    /**
373
     * @param Query $query
374
     * @param null|WhereInFile $whereInFile
375
     * @param null|WriteToFile $writeToFile
376
     * @return CurlerRequest
377
     * @throws \Exception
378
     */
379 41
    public function getRequestRead(Query $query, $whereInFile = null, $writeToFile = null)
380
    {
381 41
        $urlParams = ['readonly' => 1];
382 41
        $query_as_string = false;
383
        // ---------------------------------------------------------------------------------
384 41
        if ($whereInFile instanceof WhereInFile && $whereInFile->size()) {
385
            // $request = $this->prepareSelectWhereIn($request, $whereInFile);
386 1
            $structure = $whereInFile->fetchUrlParams();
387
            // $structure = [];
388 1
            $urlParams = array_merge($urlParams, $structure);
389 1
            $query_as_string = true;
390
        }
391
        // ---------------------------------------------------------------------------------
392
        // if result to file
393 41
        if ($writeToFile instanceof WriteToFile && $writeToFile->fetchFormat()) {
394 1
            $query->setFormat($writeToFile->fetchFormat());
395 1
            unset($urlParams['extremes']);
396
        }
397
        // ---------------------------------------------------------------------------------
398
        // makeRequest read
399 41
        $request = $this->makeRequest($query, $urlParams, $query_as_string);
400
        // ---------------------------------------------------------------------------------
401
        // attach files
402 41
        if ($whereInFile instanceof WhereInFile && $whereInFile->size()) {
403 1
            $request->attachFiles($whereInFile->fetchFiles());
404
        }
405
        // ---------------------------------------------------------------------------------
406
        // result to file
407 41
        if ($writeToFile instanceof WriteToFile && $writeToFile->fetchFormat()) {
408
409 1
            $fout = fopen($writeToFile->fetchFile(), 'w');
410 1
            if (is_resource($fout)) {
411
412 1
                $isGz = $writeToFile->getGzip();
413
414 1
                if ($isGz) {
415
                    // write gzip header
416
                    // "\x1f\x8b\x08\x00\x00\x00\x00\x00"
417
                    // fwrite($fout, "\x1F\x8B\x08\x08".pack("V", time())."\0\xFF", 10);
418
                    // write the original file name
419
                    // $oname = str_replace("\0", "", basename($writeToFile->fetchFile()));
420
                    // fwrite($fout, $oname."\0", 1+strlen($oname));
421
422
                    fwrite($fout, "\x1f\x8b\x08\x00\x00\x00\x00\x00");
423
424
                }
425
426
427
                $request->setResultFileHandle($fout, $isGz)->setCallbackFunction(function(CurlerRequest $request) {
428
                    fclose($request->getResultFileHandle());
429 1
                });
430
            }
431
        }
432 41
        if ($this->xClickHouseProgress)
433
        {
434 1
            $request->setFunctionProgress([$this, '__findXClickHouseProgress']);
435
        }
436
        // ---------------------------------------------------------------------------------
437 41
        return $request;
438
439
    }
440
441 1
    public function cleanQueryDegeneration()
442
    {
443 1
        $this->_query_degenerations = [];
444 1
        return true;
445
    }
446
447 51
    public function addQueryDegeneration(Degeneration $degeneration)
448
    {
449 51
        $this->_query_degenerations[] = $degeneration;
450 51
        return true;
451
    }
452
453
    /**
454
     * @param Query $query
455
     * @return CurlerRequest
456
     * @throws \ClickHouseDB\Exception\TransportException
457
     */
458 19
    public function getRequestWrite(Query $query)
459
    {
460 19
        $urlParams = ['readonly' => 0];
461 19
        return $this->makeRequest($query, $urlParams);
462
    }
463
464
    /**
465
     * @param string $sql
466
     * @param array $bindings
467
     * @return Query
468
     */
469 41
    private function prepareQuery($sql, $bindings)
470
    {
471
472
        // add Degeneration query
473 41
        foreach ($this->_query_degenerations as $degeneration) {
474 41
            $degeneration->bindParams($bindings);
475
        }
476
477 41
        return new Query($sql, $this->_query_degenerations);
478
    }
479
480
481
    /**
482
     * @param Query|string $sql
483
     * @param array $bindings
484
     * @param null|WhereInFile $whereInFile
485
     * @param null|WriteToFile $writeToFile
486
     * @return CurlerRequest
487
     * @throws \Exception
488
     */
489 40
    private function prepareSelect($sql, $bindings, $whereInFile, $writeToFile = null)
490
    {
491 40
        if ($sql instanceof Query) {
492
            return $this->getRequestWrite($sql);
493
        }
494 40
        $query = $this->prepareQuery($sql, $bindings);
495 40
        $query->setFormat('JSON');
496 40
        return $this->getRequestRead($query, $whereInFile, $writeToFile);
497
    }
498
499
500
501
502
    /**
503
     * @param Query|string $sql
504
     * @param array $bindings
505
     * @return CurlerRequest
506
     * @throws \ClickHouseDB\Exception\TransportException
507
     */
508 20
    private function prepareWrite($sql, $bindings = [])
509
    {
510 20
        if ($sql instanceof Query) {
511
            return $this->getRequestWrite($sql);
512
        }
513
514 20
        $query = $this->prepareQuery($sql, $bindings);
515 19
        return $this->getRequestWrite($query);
516
    }
517
518
    /**
519
     * @return bool
520
     * @throws \ClickHouseDB\Exception\TransportException
521
     */
522 8
    public function executeAsync()
523
    {
524 8
        return $this->_curler->execLoopWait();
525
    }
526
527
    /**
528
     * @param Query|string $sql
529
     * @param array $bindings
530
     * @param null|WhereInFile $whereInFile
531
     * @param null|WriteToFile $writeToFile
532
     * @return Statement
533
     * @throws \ClickHouseDB\Exception\TransportException
534
     * @throws \Exception
535
     */
536 39
    public function select($sql, array $bindings = [], $whereInFile = null, $writeToFile = null)
537
    {
538 39
        $request = $this->prepareSelect($sql, $bindings, $whereInFile, $writeToFile);
539 39
        $this->_curler->execOne($request);
540 39
        return new Statement($request);
541
    }
542
543
    /**
544
     * @param Query|string $sql
545
     * @param array $bindings
546
     * @param null|WhereInFile $whereInFile
547
     * @param null|WriteToFile $writeToFile
548
     * @return Statement
549
     * @throws \ClickHouseDB\Exception\TransportException
550
     * @throws \Exception
551
     */
552 4
    public function selectAsync($sql, array $bindings = [], $whereInFile = null, $writeToFile = null)
553
    {
554 4
        $request = $this->prepareSelect($sql, $bindings, $whereInFile, $writeToFile);
555 4
        $this->_curler->addQueLoop($request);
556 4
        return new Statement($request);
557
    }
558
559
    /**
560
     * @param callable $callback
561
     */
562 1
    public function setProgressFunction(callable $callback)
563
    {
564 1
        $this->xClickHouseProgress = $callback;
565 1
    }
566
567
    /**
568
     * @param string $sql
569
     * @param array $bindings
570
     * @param bool $exception
571
     * @return Statement
572
     * @throws \ClickHouseDB\Exception\TransportException
573
     */
574 20
    public function write($sql, array $bindings = [], $exception = true)
575
    {
576 20
        $request = $this->prepareWrite($sql, $bindings);
577 19
        $this->_curler->execOne($request);
578 19
        $response = new Statement($request);
579 19
        if ($exception) {
580 19
            if ($response->isError()) {
581 3
                $response->error();
582
            }
583
        }
584 17
        return $response;
585
    }
586
587
    /**
588
     * @param Stream $streamRW
589
     * @param CurlerRequest $request
590
     * @return Statement
591
     * @throws \ClickHouseDB\Exception\TransportException
592
     */
593 2
    private function streaming(Stream $streamRW,CurlerRequest $request)
594
    {
595 2
        $callable=$streamRW->getClosure();
596 2
        $stream=$streamRW->getStream();
597
598
599
600
        try {
601
602
603 2
            if (!is_callable($callable)) {
604
                if ($streamRW->isWrite())
605
                {
606
607
                    $callable = function ($ch, $fd, $length) use ($stream) {
608
                        return ($line = fread($stream, $length)) ? $line : '';
609
                    };
610
                } else {
611
                    $callable = function ($ch, $fd) use ($stream) {
612
                        return fwrite($stream, $fd);
613
                    };
614
                }
615
            }
616
617 2
            if ($streamRW->isGzipHeader()) {
618
619 1
                if ($streamRW->isWrite())
620
                {
621 1
                    $request->header('Content-Encoding', 'gzip');
622 1
                    $request->header('Content-Type', 'application/x-www-form-urlencoded');
623
                } else {
624
                    $request->header('Accept-Encoding', 'gzip');
625
                }
626
627
            }
628
629
630
631 2
            $request->header('Transfer-Encoding', 'chunked');
632
633
634 2
            if ($streamRW->isWrite())
635
            {
636 1
                $request->setReadFunction($callable);
637
            } else {
638 1
                $request->setWriteFunction($callable);
639
640
641
642
//                $request->setHeaderFunction($callableHead);
643
            }
644
645
646 2
            $this->_curler->execOne($request,true);
647 2
            $response = new Statement($request);
648 2
            if ($response->isError()) {
649
                $response->error();
650
            }
651 2
            return $response;
652
        } finally {
653 2
            if ($streamRW->isWrite())
654 2
            fclose($stream);
655
        }
656
657
658
    }
659
660
661
    /**
662
     * @param Stream $streamRead
663
     * @param string $sql
664
     * @param array $bindings
665
     * @return Statement
666
     * @throws \ClickHouseDB\Exception\TransportException
667
     */
668 1
    public function streamRead(Stream $streamRead,$sql,$bindings=[])
669
    {
670 1
        $sql=$this->prepareQuery($sql,$bindings);
671 1
        $request=$this->getRequestRead($sql);
672 1
        return $this->streaming($streamRead,$request);
673
674
    }
675
676
    /**
677
     * @param Stream $streamWrite
678
     * @param string $sql
679
     * @param array $bindings
680
     * @return Statement
681
     * @throws \ClickHouseDB\Exception\TransportException
682
     */
683 1
    public function streamWrite(Stream $streamWrite,$sql,$bindings=[])
684
    {
685 1
        $sql=$this->prepareQuery($sql,$bindings);
686 1
        $request = $this->writeStreamData($sql);
687 1
        return $this->streaming($streamWrite,$request);
688
    }
689
}
690