Passed
Branch master (8a55cf)
by Igor
12:47 queued 09:06
created

Http::__findXClickHouseProgress()   B

Complexity

Conditions 8
Paths 7

Size

Total Lines 31

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 15
CRAP Score 8.5969

Importance

Changes 0
Metric Value
dl 0
loc 31
ccs 15
cts 19
cp 0.7895
rs 8.1795
c 0
b 0
f 0
cc 8
nc 7
nop 1
crap 8.5969
1
<?php
2
3
namespace ClickHouseDB\Transport;
4
5
use ClickHouseDB\Query\Degeneration;
6
use ClickHouseDB\Query\Query;
7
use ClickHouseDB\Query\WhereInFile;
8
use ClickHouseDB\Query\WriteToFile;
9
use ClickHouseDB\Settings;
10
use ClickHouseDB\Statement;
11
12
class Http
13
{
14
    /**
15
     * @var string
16
     */
17
    private $_username = null;
18
19
    /**
20
     * @var string
21
     */
22
    private $_password = null;
23
24
    /**
25
     * @var string
26
     */
27
    private $_host = '';
28
29
    /**
30
     * @var int
31
     */
32
    private $_port = 0;
33
34
    /**
35
     * @var bool|int
36
     */
37
    private $_verbose = false;
38
39
    /**
40
     * @var CurlerRolling
41
     */
42
    private $_curler = null;
43
44
    /**
45
     * @var Settings
46
     */
47
    private $_settings = null;
48
49
    /**
50
     * @var array
51
     */
52
    private $_query_degenerations = [];
53
54
    /**
55
     * Count seconds (int)
56
     *
57
     * @var int
58
     */
59
    private $_connectTimeOut = 5;
60
61
    /**
62
     * @var callable
63
     */
64
    private $xClickHouseProgress = null;
65
66
    /**
67
     * Http constructor.
68
     * @param string $host
69
     * @param int $port
70
     * @param string $username
71
     * @param string $password
72
     */
73 44
    public function __construct($host, $port, $username, $password)
74
    {
75 44
        $this->setHost($host, $port);
76
77 44
        $this->_username = $username;
78 44
        $this->_password = $password;
79 44
        $this->_settings = new Settings($this);
80
81 44
        $this->setCurler();
82 44
    }
83
84
85 44
    public function setCurler()
86
    {
87 44
        $this->_curler = new CurlerRolling();
88 44
    }
89
90
    /**
91
     * @return CurlerRolling
92
     */
93
    public function getCurler()
94
    {
95
        return $this->_curler;
96
    }
97
98
    /**
99
     * @param string $host
100
     * @param int $port
101
     */
102 44
    public function setHost($host, $port = -1)
103
    {
104 44
        if ($port > 0) {
105 44
            $this->_port = $port;
106
        }
107
108 44
        $this->_host = $host;
109 44
    }
110
111
    /**
112
     * @return string
113
     */
114 36
    public function getUri()
115
    {
116 36
        $proto = 'http';
117 36
        if ($this->settings()->isHttps()) {
118
            $proto = 'https';
119
        }
120
121 36
        return $proto . '://' . $this->_host . ':' . $this->_port;
122
    }
123
124
    /**
125
     * @return Settings
126
     */
127 44
    public function settings()
128
    {
129 44
        return $this->_settings;
130
    }
131
132
    /**
133
     * @param bool|int $flag
134
     * @return mixed
135
     */
136
    public function verbose($flag)
137
    {
138
        $this->_verbose = $flag;
139
        return $flag;
140
    }
141
142
    /**
143
     * @param array $params
144
     * @return string
145
     */
146 36
    private function getUrl($params = [])
147
    {
148 36
        $settings = $this->settings()->getSettings();
149
150 36
        if (is_array($params) && sizeof($params)) {
151 36
            $settings = array_merge($settings, $params);
152
        }
153
154
155 36
        if ($this->settings()->isReadOnlyUser())
156
        {
157
            unset($settings['extremes']);
158
            unset($settings['readonly']);
159
            unset($settings['enable_http_compression']);
160
            unset($settings['max_execution_time']);
161
162
        }
163
164 36
        unset($settings['https']);
165
166
167 36
        return $this->getUri() . '?' . http_build_query($settings);
168
    }
169
170
    /**
171
     * @param array $extendinfo
172
     * @return CurlerRequest
173
     */
174 36
    private function newRequest($extendinfo)
175
    {
176 36
        $new = new CurlerRequest();
177 36
        $new->auth($this->_username, $this->_password)
178 36
            ->POST()
179 36
            ->setRequestExtendedInfo($extendinfo);
180
181 36
        if ($this->settings()->isEnableHttpCompression()) {
182 27
            $new->httpCompression(true);
183
        }
184 36
        if ($this->settings()->getSessionId())
185
        {
186 1
            $new->persistent();
187
        }
188
189 36
        $new->timeOut($this->settings()->getTimeOut());
190 36
        $new->connectTimeOut($this->_connectTimeOut)->keepAlive(); // one sec
191 36
        $new->verbose(boolval($this->_verbose));
192
193 36
        return $new;
194
    }
195
196
    /**
197
     * @param Query $query
198
     * @param array $urlParams
199
     * @param bool $query_as_string
200
     * @return CurlerRequest
201
     * @throws \ClickHouseDB\Exception\TransportException
202
     */
203 36
    private function makeRequest(Query $query, $urlParams = [], $query_as_string = false)
204
    {
205 36
        $sql = $query->toSql();
206
207 36
        if ($query_as_string) {
208 1
            $urlParams['query'] = $sql;
209
        }
210
211 36
        $url = $this->getUrl($urlParams);
212
213
        $extendinfo = [
214 36
            'sql' => $sql,
215 36
            'query' => $query,
216 36
            'format'=> $query->getFormat()
217
        ];
218
219 36
        $new = $this->newRequest($extendinfo);
220 36
        $new->url($url);
221
222
223
224
225 36
        if (!$query_as_string) {
226 36
            $new->parameters_json($sql);
227
        }
228 36
        if ($this->settings()->isEnableHttpCompression()) {
229 27
            $new->httpCompression(true);
230
        }
231
232 36
        return $new;
233
    }
234
235
    /**
236
     * @param string $sql
237
     * @return CurlerRequest
238
     */
239 2
    public function writeStreamData($sql)
240
    {
241 2
        $query = new Query($sql);
242
243 2
        $url = $this->getUrl([
244 2
            'readonly' => 0,
245 2
            'query' => $query->toSql()
246
        ]);
247
248
        $extendinfo = [
249 2
            'sql' => $sql,
250 2
            'query' => $query,
251 2
            'format'=> $query->getFormat()
252
        ];
253
254 2
        $request = $this->newRequest($extendinfo);
255 2
        $request->url($url);
256 2
        return $request;
257
    }
258
259
260
    /**
261
     * @param string $sql
262
     * @param string $file_name
263
     * @return Statement
264
     * @throws \ClickHouseDB\Exception\TransportException
265
     */
266 7
    public function writeAsyncCSV($sql, $file_name)
267
    {
268 7
        $query = new Query($sql);
269
270 7
        $url = $this->getUrl([
271 7
            'readonly' => 0,
272 7
            'query' => $query->toSql()
273
        ]);
274
275
        $extendinfo = [
276 7
            'sql' => $sql,
277 7
            'query' => $query,
278 7
            'format'=> $query->getFormat()
279
        ];
280
281 7
        $request = $this->newRequest($extendinfo);
282 7
        $request->url($url);
283
284
        $request->setCallbackFunction(function(CurlerRequest $request) {
285 7
            fclose($request->getInfileHandle());
0 ignored issues
show
Bug introduced by
$request->getInfileHandle() of type boolean is incompatible with the type resource expected by parameter $handle of fclose(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

285
            fclose(/** @scrutinizer ignore-type */ $request->getInfileHandle());
Loading history...
286 7
        });
287
288 7
        $request->setInfile($file_name);
289 7
        $this->_curler->addQueLoop($request);
290
291 7
        return new Statement($request);
292
    }
293
294
    /**
295
     * get Count Pending Query in Queue
296
     *
297
     * @return int
298
     */
299 9
    public function getCountPendingQueue()
300
    {
301 9
        return $this->_curler->countPending();
302
    }
303
304
    /**
305
     * set Connect TimeOut in seconds [CURLOPT_CONNECTTIMEOUT] ( int )
306
     *
307
     * @param int $connectTimeOut
308
     */
309 2
    public function setConnectTimeOut($connectTimeOut)
310
    {
311 2
        $this->_connectTimeOut = $connectTimeOut;
312 2
    }
313
314
    /**
315
     * get ConnectTimeOut in seconds
316
     *
317
     * @return int
318
     */
319 1
    public function getConnectTimeOut()
320
    {
321 1
        return $this->_connectTimeOut;
322
    }
323
324
325 1
    public function __findXClickHouseProgress($handle)
326
    {
327 1
        $code = curl_getinfo($handle, CURLINFO_HTTP_CODE);
328
329
        // Search X-ClickHouse-Progress
330 1
        if ($code == 200) {
331 1
            $response = curl_multi_getcontent($handle);
332 1
            $header_size = curl_getinfo($handle, CURLINFO_HEADER_SIZE);
333 1
            if (!$header_size) {
334
                return false;
335
            }
336
337 1
            $header = substr($response, 0, $header_size);
338 1
            if (!$header_size) {
339
                return false;
340
            }
341 1
            $pos = strrpos($header, 'X-ClickHouse-Progress');
342
343 1
            if (!$pos) {
344
                return false;
345
            }
346
347 1
            $last = substr($header, $pos);
348 1
            $data = @json_decode(str_ireplace('X-ClickHouse-Progress:', '', $last), true);
349
350 1
            if ($data && is_callable($this->xClickHouseProgress)) {
351
352 1
                if (is_array($this->xClickHouseProgress)) {
353
                    call_user_func_array($this->xClickHouseProgress, [$data]);
354
                } else {
355 1
                    call_user_func($this->xClickHouseProgress, $data);
356
                }
357
358
359
            }
360
361
        }
362
363 1
    }
364
365
    /**
366
     * @param Query $query
367
     * @param null|WhereInFile $whereInFile
368
     * @param null|WriteToFile $writeToFile
369
     * @return CurlerRequest
370
     * @throws \Exception
371
     */
372 36
    public function getRequestRead(Query $query, $whereInFile = null, $writeToFile = null)
373
    {
374 36
        $urlParams = ['readonly' => 1];
375 36
        $query_as_string = false;
376
        // ---------------------------------------------------------------------------------
377 36
        if ($whereInFile instanceof WhereInFile && $whereInFile->size()) {
378
            // $request = $this->prepareSelectWhereIn($request, $whereInFile);
379 1
            $structure = $whereInFile->fetchUrlParams();
380
            // $structure = [];
381 1
            $urlParams = array_merge($urlParams, $structure);
382 1
            $query_as_string = true;
383
        }
384
        // ---------------------------------------------------------------------------------
385
        // if result to file
386 36
        if ($writeToFile instanceof WriteToFile && $writeToFile->fetchFormat()) {
387 1
            $query->setFormat($writeToFile->fetchFormat());
388 1
            unset($urlParams['extremes']);
389
        }
390
        // ---------------------------------------------------------------------------------
391
        // makeRequest read
392 36
        $request = $this->makeRequest($query, $urlParams, $query_as_string);
393
        // ---------------------------------------------------------------------------------
394
        // attach files
395 36
        if ($whereInFile instanceof WhereInFile && $whereInFile->size()) {
396 1
            $request->attachFiles($whereInFile->fetchFiles());
397
        }
398
        // ---------------------------------------------------------------------------------
399
        // result to file
400 36
        if ($writeToFile instanceof WriteToFile && $writeToFile->fetchFormat()) {
401
402 1
            $fout = fopen($writeToFile->fetchFile(), 'w');
403 1
            $isGz = $writeToFile->getGzip();
404
405 1
            if ($isGz) {
406
                // write gzip header
407
                // "\x1f\x8b\x08\x00\x00\x00\x00\x00"
408
                // fwrite($fout, "\x1F\x8B\x08\x08".pack("V", time())."\0\xFF", 10);
409
                // write the original file name
410
                // $oname = str_replace("\0", "", basename($writeToFile->fetchFile()));
411
                // fwrite($fout, $oname."\0", 1+strlen($oname));
412
413
                fwrite($fout, "\x1f\x8b\x08\x00\x00\x00\x00\x00");
0 ignored issues
show
Bug introduced by
It seems like $fout can also be of type false; however, parameter $handle of fwrite() does only seem to accept resource, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

413
                fwrite(/** @scrutinizer ignore-type */ $fout, "\x1f\x8b\x08\x00\x00\x00\x00\x00");
Loading history...
414
415
            }
416
417
418
            $request->setResultFileHandle($fout, $isGz)->setCallbackFunction(function(CurlerRequest $request) {
0 ignored issues
show
Bug introduced by
It seems like $fout can also be of type false; however, parameter $h of ClickHouseDB\Transport\C...::setResultFileHandle() does only seem to accept resource, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

418
            $request->setResultFileHandle(/** @scrutinizer ignore-type */ $fout, $isGz)->setCallbackFunction(function(CurlerRequest $request) {
Loading history...
419
                fclose($request->getResultFileHandle());
420 1
            });
421
        }
422 36
        if ($this->xClickHouseProgress)
423
        {
424 1
            $request->setFunctionProgress([$this, '__findXClickHouseProgress']);
425
        }
426
        // ---------------------------------------------------------------------------------
427 36
        return $request;
428
429
    }
430
431 1
    public function cleanQueryDegeneration()
432
    {
433 1
        $this->_query_degenerations = [];
434 1
        return true;
435
    }
436
437 44
    public function addQueryDegeneration(Degeneration $degeneration)
438
    {
439 44
        $this->_query_degenerations[] = $degeneration;
440 44
        return true;
441
    }
442
443
    /**
444
     * @param Query $query
445
     * @return CurlerRequest
446
     * @throws \ClickHouseDB\Exception\TransportException
447
     */
448 18
    public function getRequestWrite(Query $query)
449
    {
450 18
        $urlParams = ['readonly' => 0];
451 18
        return $this->makeRequest($query, $urlParams);
452
    }
453
454
    /**
455
     * @param string $sql
456
     * @param array $bindings
457
     * @return Query
458
     */
459 36
    private function prepareQuery($sql, $bindings)
460
    {
461
462
        // add Degeneration query
463 36
        foreach ($this->_query_degenerations as $degeneration) {
464 36
            $degeneration->bindParams($bindings);
465
        }
466
467 36
        return new Query($sql, $this->_query_degenerations);
468
    }
469
470
471
    /**
472
     * @param Query|string $sql
473
     * @param array $bindings
474
     * @param null|WhereInFile $whereInFile
475
     * @param null|WriteToFile $writeToFile
476
     * @return CurlerRequest
477
     * @throws \Exception
478
     */
479 36
    private function prepareSelect($sql, $bindings, $whereInFile, $writeToFile = null)
480
    {
481 36
        if ($sql instanceof Query) {
482
            return $this->getRequestWrite($sql);
483
        }
484
485
486 36
        $query = $this->prepareQuery($sql, $bindings);
487 36
        $query->setFormat('JSON');
488 36
        return $this->getRequestRead($query, $whereInFile, $writeToFile);
489
490
    }
491
492
    /**
493
     * @param Query|string $sql
494
     * @param array $bindings
495
     * @return CurlerRequest
496
     * @throws \ClickHouseDB\Exception\TransportException
497
     */
498 19
    private function prepareWrite($sql, $bindings = [])
499
    {
500 19
        if ($sql instanceof Query) {
501
            return $this->getRequestWrite($sql);
502
        }
503
504 19
        $query = $this->prepareQuery($sql, $bindings);
505 18
        return $this->getRequestWrite($query);
506
    }
507
508
    /**
509
     * @return bool
510
     * @throws \ClickHouseDB\Exception\TransportException
511
     */
512 8
    public function executeAsync()
513
    {
514 8
        return $this->_curler->execLoopWait();
515
    }
516
517
    /**
518
     * @param Query|string $sql
519
     * @param array $bindings
520
     * @param null|WhereInFile $whereInFile
521
     * @param null|WriteToFile $writeToFile
522
     * @return Statement
523
     * @throws \ClickHouseDB\Exception\TransportException
524
     * @throws \Exception
525
     */
526 35
    public function select($sql, array $bindings = [], $whereInFile = null, $writeToFile = null)
527
    {
528 35
        $request = $this->prepareSelect($sql, $bindings, $whereInFile, $writeToFile);
529 35
        $this->_curler->execOne($request);
530 35
        return new Statement($request);
531
    }
532
533
    /**
534
     * @param Query|string $sql
535
     * @param array $bindings
536
     * @param null $whereInFile
0 ignored issues
show
Documentation Bug introduced by
Are you sure the doc-type for parameter $whereInFile is correct as it would always require null to be passed?
Loading history...
537
     * @param null $writeToFile
0 ignored issues
show
Documentation Bug introduced by
Are you sure the doc-type for parameter $writeToFile is correct as it would always require null to be passed?
Loading history...
538
     * @return Statement
539
     * @throws \ClickHouseDB\Exception\TransportException
540
     * @throws \Exception
541
     */
542 4
    public function selectAsync($sql, array $bindings = [], $whereInFile = null, $writeToFile = null)
543
    {
544 4
        $request = $this->prepareSelect($sql, $bindings, $whereInFile, $writeToFile);
545 4
        $this->_curler->addQueLoop($request);
546 4
        return new Statement($request);
547
    }
548
549
    /**
550
     * @param callable $callback
551
     */
552 1
    public function setProgressFunction(callable $callback)
553
    {
554 1
        $this->xClickHouseProgress = $callback;
555 1
    }
556
557
    /**
558
     * @param string $sql
559
     * @param array $bindings
560
     * @param bool $exception
561
     * @return Statement
562
     * @throws \ClickHouseDB\Exception\TransportException
563
     */
564 19
    public function write($sql, array $bindings = [], $exception = true)
565
    {
566 19
        $request = $this->prepareWrite($sql, $bindings);
567 18
        $this->_curler->execOne($request);
568 18
        $response = new Statement($request);
569 18
        if ($exception) {
570 18
            if ($response->isError()) {
571 3
                $response->error();
572
            }
573
        }
574 16
        return $response;
575
    }
576
}
577