Passed
Push — master ( 22bce8...5a27f9 )
by Igor
03:05
created

Http::prepareQuery()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 9
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 2

Importance

Changes 0
Metric Value
c 0
b 0
f 0
dl 0
loc 9
rs 9.6666
ccs 4
cts 4
cp 1
cc 2
eloc 3
nc 2
nop 2
crap 2
1
<?php
2
3
namespace ClickHouseDB\Transport;
4
5
use ClickHouseDB\Query\Degeneration;
6
use ClickHouseDB\Query\Query;
7
use ClickHouseDB\Query\WhereInFile;
8
use ClickHouseDB\Query\WriteToFile;
9
use ClickHouseDB\Settings;
10
use ClickHouseDB\Statement;
11
12
class Http
13
{
14
    /**
15
     * @var string
16
     */
17
    private $_username = null;
18
19
    /**
20
     * @var string
21
     */
22
    private $_password = null;
23
24
    /**
25
     * @var string
26
     */
27
    private $_host = '';
28
29
    /**
30
     * @var int
31
     */
32
    private $_port = 0;
33
34
    /**
35
     * @var bool|int
36
     */
37
    private $_verbose = false;
38
39
    /**
40
     * @var CurlerRolling
41
     */
42
    private $_curler = null;
43
44
    /**
45
     * @var Settings
46
     */
47
    private $_settings = null;
48
49
    /**
50
     * @var array
51
     */
52
    private $_query_degenerations = [];
53
54
    /**
55
     * Count seconds (int)
56
     *
57
     * @var int
58
     */
59
    private $_connectTimeOut = 5;
60
61
    /**
62
     * @var callable
63
     */
64
    private $xClickHouseProgress=null;
65
66
    /**
67
     * Http constructor.
68
     * @param string $host
69
     * @param int $port
70
     * @param string $username
71
     * @param string $password
72
     */
73 44
    public function __construct($host, $port, $username, $password)
74
    {
75 44
        $this->setHost($host, $port);
76
77 44
        $this->_username = $username;
78 44
        $this->_password = $password;
79 44
        $this->_settings = new Settings($this);
80
81 44
        $this->setCurler();
82 44
    }
83
84
85 44
    public function setCurler()
86
    {
87 44
        $this->_curler = new CurlerRolling();
88 44
    }
89
90
    /**
91
     * @return CurlerRolling
92
     */
93
    public function getCurler()
94
    {
95
        return $this->_curler;
96
    }
97
98
    /**
99
     * @param string $host
100
     * @param int $port
101
     */
102 44
    public function setHost($host, $port = -1)
103
    {
104 44
        if ($port > 0) {
105 44
            $this->_port = $port;
106
        }
107
108 44
        $this->_host = $host;
109 44
    }
110
111
    /**
112
     * @return string
113
     */
114 36
    public function getUri()
115
    {
116 36
        $proto='http';
117 36
        if ($this->settings()->isHttps()) $proto='https';
118
119 36
        return $proto.'://' . $this->_host . ':' . $this->_port;
120
    }
121
122
    /**
123
     * @return Settings
124
     */
125 44
    public function settings()
126
    {
127 44
        return $this->_settings;
128
    }
129
130
    /**
131
     * @param bool|int $flag
132
     * @return mixed
133
     */
134
    public function verbose($flag)
135
    {
136
        $this->_verbose = $flag;
137
        return $flag;
138
    }
139
140
    /**
141
     * @param array $params
142
     * @return string
143
     */
144 36
    private function getUrl($params = [])
145
    {
146 36
        $settings = $this->settings()->getSettings();
147
148 36
        if (is_array($params) && sizeof($params)) {
149 36
            $settings = array_merge($settings, $params);
150
        }
151
152
153 36
        if ($this->settings()->isReadOnlyUser())
154
        {
155
            unset($settings['extremes']);
156
            unset($settings['readonly']);
157
            unset($settings['enable_http_compression']);
158
            unset($settings['max_execution_time']);
159
160
        }
161
162 36
        unset($settings['https']);
163
164
165 36
        return $this->getUri() . '?' . http_build_query($settings);
166
    }
167
168
    /**
169
     * @param array $extendinfo
170
     * @return CurlerRequest
171
     */
172 36
    private function newRequest($extendinfo)
173
    {
174 36
        $new = new CurlerRequest();
175 36
        $new->auth($this->_username, $this->_password)
176 36
            ->POST()
177 36
            ->setRequestExtendedInfo($extendinfo);
178
179 36
        if ($this->settings()->isEnableHttpCompression()) {
180 27
            $new->httpCompression(true);
181
        }
182 36
        if ($this->settings()->getSessionId())
183
        {
184 1
            $new->persistent();
185
        }
186
187 36
        $new->timeOut($this->settings()->getTimeOut());
188 36
        $new->connectTimeOut($this->_connectTimeOut)->keepAlive();// one sec
189 36
        $new->verbose(boolval($this->_verbose));
190
191 36
        return $new;
192
    }
193
194
    /**
195
     * @param Query $query
196
     * @param array $urlParams
197
     * @param bool $query_as_string
198
     * @return CurlerRequest
199
     * @throws \ClickHouseDB\Exception\TransportException
200
     */
201 36
    private function makeRequest(Query $query, $urlParams = [], $query_as_string = false)
202
    {
203 36
        $sql = $query->toSql();
204
205 36
        if ($query_as_string) {
206 1
            $urlParams['query'] = $sql;
207
        }
208
209 36
        $url = $this->getUrl($urlParams);
210
211
        $extendinfo = [
212 36
            'sql' => $sql,
213 36
            'query' => $query,
214 36
            'format'=> $query->getFormat()
215
        ];
216
217 36
        $new = $this->newRequest($extendinfo);
218 36
        $new->url($url);
219
220
221
222
223 36
        if (!$query_as_string) {
224 36
            $new->parameters_json($sql);
225
        }
226 36
        if ($this->settings()->isEnableHttpCompression()) {
227 27
            $new->httpCompression(true);
228
        }
229
230 36
        return $new;
231
    }
232
233
    /**
234
     * @param string $sql
235
     * @return CurlerRequest
236
     */
237 2
    public function writeStreamData($sql)
238
    {
239 2
        $query = new Query($sql);
240
241 2
        $url = $this->getUrl([
242 2
            'readonly' => 0,
243 2
            'query' => $query->toSql()
244
        ]);
245
246
        $extendinfo = [
247 2
            'sql' => $sql,
248 2
            'query' => $query,
249 2
            'format'=> $query->getFormat()
250
        ];
251
252 2
        $request = $this->newRequest($extendinfo);
253 2
        $request->url($url);
254 2
        return $request;
255
    }
256
257
258
    /**
259
     * @param string $sql
260
     * @param string $file_name
261
     * @return Statement
262
     * @throws \ClickHouseDB\Exception\TransportException
263
     */
264 7
    public function writeAsyncCSV($sql, $file_name)
265
    {
266 7
        $query = new Query($sql);
267
268 7
        $url = $this->getUrl([
269 7
            'readonly' => 0,
270 7
            'query' => $query->toSql()
271
        ]);
272
273
        $extendinfo = [
274 7
            'sql' => $sql,
275 7
            'query' => $query,
276 7
            'format'=> $query->getFormat()
277
        ];
278
279 7
        $request = $this->newRequest($extendinfo);
280 7
        $request->url($url);
281
282
        $request->setCallbackFunction(function (CurlerRequest $request) {
283 7
            fclose($request->getInfileHandle());
0 ignored issues
show
Bug introduced by
$request->getInfileHandle() of type boolean is incompatible with the type resource expected by parameter $handle of fclose(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

283
            fclose(/** @scrutinizer ignore-type */ $request->getInfileHandle());
Loading history...
284 7
        });
285
286 7
        $request->setInfile($file_name);
287 7
        $this->_curler->addQueLoop($request);
288
289 7
        return new Statement($request);
290
    }
291
292
    /**
293
     * get Count Pending Query in Queue
294
     *
295
     * @return int
296
     */
297 9
    public function getCountPendingQueue()
298
    {
299 9
        return $this->_curler->countPending();
300
    }
301
302
    /**
303
     * set Connect TimeOut in seconds [CURLOPT_CONNECTTIMEOUT] ( int )
304
     *
305
     * @param int $connectTimeOut
306
     */
307 2
    public function setConnectTimeOut($connectTimeOut)
308
    {
309 2
        $this->_connectTimeOut = $connectTimeOut;
310 2
    }
311
312
    /**
313
     * get ConnectTimeOut in seconds
314
     *
315
     * @return int
316
     */
317 1
    public function getConnectTimeOut()
318
    {
319 1
        return $this->_connectTimeOut;
320
    }
321
322
323 1
    public function __findXClickHouseProgress($handle)
324
    {
325 1
        $code=curl_getinfo($handle,CURLINFO_HTTP_CODE);
326
327
        // Search X-ClickHouse-Progress
328 1
        if ($code==200) {
329 1
            $response = curl_multi_getcontent($handle);
330 1
            $header_size = curl_getinfo($handle, CURLINFO_HEADER_SIZE);
331 1
            if (!$header_size) return false;
332
333 1
            $header = substr($response, 0, $header_size);
334 1
            if (!$header_size) return false;
335 1
            $pos=strrpos($header,'X-ClickHouse-Progress');
336
337 1
            if (!$pos) return false;
338
339 1
            $last=substr($header,$pos);
340 1
            $data=@json_decode(str_ireplace('X-ClickHouse-Progress:','',$last),true);
341
342 1
            if ($data && is_callable($this->xClickHouseProgress)) {
343
344 1
                if (is_array($this->xClickHouseProgress)){
345
                    call_user_func_array($this->xClickHouseProgress,[$data]);
346
                } else {
347 1
                    call_user_func($this->xClickHouseProgress,$data);
348
                }
349
350
351
            }
352
353
        }
354
355 1
    }
356
357
    /**
358
     * @param Query $query
359
     * @param null|WhereInFile $whereInFile
360
     * @param null|WriteToFile $writeToFile
361
     * @return CurlerRequest
362
     * @throws \Exception
363
     */
364 36
    public function getRequestRead(Query $query, $whereInFile = null, $writeToFile = null)
365
    {
366 36
        $urlParams = ['readonly' => 1];
367 36
        $query_as_string = false;
368
        // ---------------------------------------------------------------------------------
369 36
        if ($whereInFile instanceof WhereInFile && $whereInFile->size()) {
370
            // $request = $this->prepareSelectWhereIn($request, $whereInFile);
371 1
            $structure = $whereInFile->fetchUrlParams();
372
            // $structure = [];
373 1
            $urlParams = array_merge($urlParams, $structure);
374 1
            $query_as_string = true;
375
        }
376
        // ---------------------------------------------------------------------------------
377
        // if result to file
378 36
        if ($writeToFile instanceof WriteToFile && $writeToFile->fetchFormat()) {
379 1
            $query->setFormat($writeToFile->fetchFormat());
380 1
            unset($urlParams['extremes']);
381
        }
382
        // ---------------------------------------------------------------------------------
383
        // makeRequest read
384 36
        $request = $this->makeRequest($query, $urlParams, $query_as_string);
385
        // ---------------------------------------------------------------------------------
386
        // attach files
387 36
        if ($whereInFile instanceof WhereInFile && $whereInFile->size()) {
388 1
            $request->attachFiles($whereInFile->fetchFiles());
389
        }
390
        // ---------------------------------------------------------------------------------
391
        // result to file
392 36
        if ($writeToFile instanceof WriteToFile && $writeToFile->fetchFormat()) {
393
394 1
            $fout = fopen($writeToFile->fetchFile(), 'w');
395 1
            $isGz = $writeToFile->getGzip();
396
397 1
            if ($isGz) {
398
                // write gzip header
399
                // "\x1f\x8b\x08\x00\x00\x00\x00\x00"
400
                // fwrite($fout, "\x1F\x8B\x08\x08".pack("V", time())."\0\xFF", 10);
401
                // write the original file name
402
                // $oname = str_replace("\0", "", basename($writeToFile->fetchFile()));
403
                // fwrite($fout, $oname."\0", 1+strlen($oname));
404
405
                fwrite($fout, "\x1f\x8b\x08\x00\x00\x00\x00\x00");
0 ignored issues
show
Bug introduced by
It seems like $fout can also be of type false; however, parameter $handle of fwrite() does only seem to accept resource, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

405
                fwrite(/** @scrutinizer ignore-type */ $fout, "\x1f\x8b\x08\x00\x00\x00\x00\x00");
Loading history...
406
407
            }
408
409
410
            $request->setResultFileHandle($fout, $isGz)->setCallbackFunction(function (CurlerRequest $request) {
0 ignored issues
show
Bug introduced by
It seems like $fout can also be of type false; however, parameter $h of ClickHouseDB\Transport\C...::setResultFileHandle() does only seem to accept resource, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

410
            $request->setResultFileHandle(/** @scrutinizer ignore-type */ $fout, $isGz)->setCallbackFunction(function (CurlerRequest $request) {
Loading history...
411
                fclose($request->getResultFileHandle());
412 1
            });
413
        }
414 36
        if ($this->xClickHouseProgress)
415
        {
416 1
            $request->setFunctionProgress([$this,'__findXClickHouseProgress']);
417
        }
418
        // ---------------------------------------------------------------------------------
419 36
        return $request;
420
421
    }
422
423 1
    public function cleanQueryDegeneration()
424
    {
425 1
        $this->_query_degenerations = [];
426 1
        return true;
427
    }
428
429 44
    public function addQueryDegeneration(Degeneration $degeneration)
430
    {
431 44
        $this->_query_degenerations[] = $degeneration;
432 44
        return true;
433
    }
434
435
    /**
436
     * @param Query $query
437
     * @return CurlerRequest
438
     * @throws \ClickHouseDB\Exception\TransportException
439
     */
440 18
    public function getRequestWrite(Query $query)
441
    {
442 18
        $urlParams = ['readonly' => 0];
443 18
        return $this->makeRequest($query, $urlParams);
444
    }
445
446
    /**
447
     * @param string $sql
448
     * @param array $bindings
449
     * @return Query
450
     */
451 36
    private function prepareQuery($sql, $bindings)
452
    {
453
454
        // add Degeneration query
455 36
        foreach ($this->_query_degenerations as $degeneration) {
456 36
            $degeneration->bindParams($bindings);
457
        }
458
459 36
        return new Query($sql, $this->_query_degenerations);
460
    }
461
462
463
    /**
464
     * @param Query|string $sql
465
     * @param array $bindings
466
     * @param null|WhereInFile $whereInFile
467
     * @param null|WriteToFile $writeToFile
468
     * @return CurlerRequest
469
     * @throws \Exception
470
     */
471 36
    private function prepareSelect($sql, $bindings, $whereInFile, $writeToFile = null)
472
    {
473 36
        if ($sql instanceof Query) {
474
            return $this->getRequestWrite($sql);
475
        }
476
477
478 36
        $query = $this->prepareQuery($sql, $bindings);
479 36
        $query->setFormat('JSON');
480 36
        return $this->getRequestRead($query, $whereInFile, $writeToFile);
481
482
    }
483
484
    /**
485
     * @param Query|string $sql
486
     * @param array $bindings
487
     * @return CurlerRequest
488
     * @throws \ClickHouseDB\Exception\TransportException
489
     */
490 19
    private function prepareWrite($sql, $bindings = [])
491
    {
492 19
        if ($sql instanceof Query) {
493
            return $this->getRequestWrite($sql);
494
        }
495
496 19
        $query = $this->prepareQuery($sql, $bindings);
497 18
        return $this->getRequestWrite($query);
498
    }
499
500
    /**
501
     * @return bool
502
     * @throws \ClickHouseDB\Exception\TransportException
503
     */
504 8
    public function executeAsync()
505
    {
506 8
        return $this->_curler->execLoopWait();
507
    }
508
509
    /**
510
     * @param Query|string $sql
511
     * @param array $bindings
512
     * @param null|WhereInFile $whereInFile
513
     * @param null|WriteToFile $writeToFile
514
     * @return Statement
515
     * @throws \ClickHouseDB\Exception\TransportException
516
     * @throws \Exception
517
     */
518 35
    public function select($sql, array $bindings = [], $whereInFile = null, $writeToFile = null)
519
    {
520 35
        $request = $this->prepareSelect($sql, $bindings, $whereInFile, $writeToFile);
521 35
        $this->_curler->execOne($request);
522 35
        return new Statement($request);
523
    }
524
525
    /**
526
     * @param Query|string $sql
527
     * @param array $bindings
528
     * @param null $whereInFile
0 ignored issues
show
Documentation Bug introduced by
Are you sure the doc-type for parameter $whereInFile is correct as it would always require null to be passed?
Loading history...
529
     * @param null $writeToFile
0 ignored issues
show
Documentation Bug introduced by
Are you sure the doc-type for parameter $writeToFile is correct as it would always require null to be passed?
Loading history...
530
     * @return Statement
531
     * @throws \ClickHouseDB\Exception\TransportException
532
     * @throws \Exception
533
     */
534 4
    public function selectAsync($sql, array $bindings = [], $whereInFile = null, $writeToFile = null)
535
    {
536 4
        $request = $this->prepareSelect($sql, $bindings, $whereInFile, $writeToFile);
537 4
        $this->_curler->addQueLoop($request);
538 4
        return new Statement($request);
539
    }
540
541
    /**
542
     * @param callable $callback
543
     */
544 1
    public function setProgressFunction(callable $callback)
545
    {
546 1
        $this->xClickHouseProgress=$callback;
547 1
    }
548
549
    /**
550
     * @param string $sql
551
     * @param array $bindings
552
     * @param bool $exception
553
     * @return Statement
554
     * @throws \ClickHouseDB\Exception\TransportException
555
     */
556 19
    public function write($sql, array $bindings = [], $exception = true)
557
    {
558 19
        $request = $this->prepareWrite($sql, $bindings);
559 18
        $this->_curler->execOne($request);
560 18
        $response = new Statement($request);
561 18
        if ($exception) {
562 18
            if ($response->isError()) {
563 3
                $response->error();
564
            }
565
        }
566 16
        return $response;
567
    }
568
}
569