Passed
Branch master (82e8a0)
by Igor
17:57 queued 15:24
created

Http::prepareWrite()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 8

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 2.032

Importance

Changes 0
Metric Value
dl 0
loc 8
ccs 4
cts 5
cp 0.8
rs 10
c 0
b 0
f 0
cc 2
nc 2
nop 2
crap 2.032
1
<?php
2
3
namespace ClickHouseDB\Transport;
4
5
use ClickHouseDB\Query\Degeneration;
6
use ClickHouseDB\Query\Query;
7
use ClickHouseDB\Query\WhereInFile;
8
use ClickHouseDB\Query\WriteToFile;
9
use ClickHouseDB\Settings;
10
use ClickHouseDB\Statement;
11
12
class Http
13
{
14
    /**
15
     * @var string
16
     */
17
    private $_username = null;
18
19
    /**
20
     * @var string
21
     */
22
    private $_password = null;
23
24
    /**
25
     * @var string
26
     */
27
    private $_host = '';
28
29
    /**
30
     * @var int
31
     */
32
    private $_port = 0;
33
34
    /**
35
     * @var bool|int
36
     */
37
    private $_verbose = false;
38
39
    /**
40
     * @var CurlerRolling
41
     */
42
    private $_curler = null;
43
44
    /**
45
     * @var Settings
46
     */
47
    private $_settings = null;
48
49
    /**
50
     * @var array
51
     */
52
    private $_query_degenerations = [];
53
54
    /**
55
     * Count seconds (int)
56
     *
57
     * @var int
58
     */
59
    private $_connectTimeOut = 5;
60
61
    /**
62
     * @var callable
63
     */
64
    private $xClickHouseProgress = null;
65
66
    /**
67
     * Http constructor.
68
     * @param string $host
69
     * @param int $port
70
     * @param string $username
71
     * @param string $password
72
     */
73 44
    public function __construct($host, $port, $username, $password)
74
    {
75 44
        $this->setHost($host, $port);
76
77 44
        $this->_username = $username;
78 44
        $this->_password = $password;
79 44
        $this->_settings = new Settings($this);
80
81 44
        $this->setCurler();
82 44
    }
83
84
85 44
    public function setCurler()
86
    {
87 44
        $this->_curler = new CurlerRolling();
88 44
    }
89
90
    /**
91
     * @return CurlerRolling
92
     */
93
    public function getCurler()
94
    {
95
        return $this->_curler;
96
    }
97
98
    /**
99
     * @param string $host
100
     * @param int $port
101
     */
102 44
    public function setHost($host, $port = -1)
103
    {
104 44
        if ($port > 0) {
105 44
            $this->_port = $port;
106
        }
107
108 44
        $this->_host = $host;
109 44
    }
110
111
    /**
112
     * @return string
113
     */
114 36
    public function getUri()
115
    {
116 36
        $proto = 'http';
117 36
        if ($this->settings()->isHttps()) {
118
            $proto = 'https';
119
        }
120
121 36
        return $proto . '://' . $this->_host . ':' . $this->_port;
122
    }
123
124
    /**
125
     * @return Settings
126
     */
127 44
    public function settings()
128
    {
129 44
        return $this->_settings;
130
    }
131
132
    /**
133
     * @param bool|int $flag
134
     * @return mixed
135
     */
136
    public function verbose($flag)
137
    {
138
        $this->_verbose = $flag;
139
        return $flag;
140
    }
141
142
    /**
143
     * @param array $params
144
     * @return string
145
     */
146 36
    private function getUrl($params = [])
147
    {
148 36
        $settings = $this->settings()->getSettings();
149
150 36
        if (is_array($params) && sizeof($params)) {
151 36
            $settings = array_merge($settings, $params);
152
        }
153
154
155 36
        if ($this->settings()->isReadOnlyUser())
156
        {
157
            unset($settings['extremes']);
158
            unset($settings['readonly']);
159
            unset($settings['enable_http_compression']);
160
            unset($settings['max_execution_time']);
161
162
        }
163
164 36
        unset($settings['https']);
165
166
167 36
        return $this->getUri() . '?' . http_build_query($settings);
168
    }
169
170
    /**
171
     * @param array $extendinfo
172
     * @return CurlerRequest
173
     */
174 36
    private function newRequest($extendinfo)
175
    {
176 36
        $new = new CurlerRequest();
177 36
        $new->auth($this->_username, $this->_password)
178 36
            ->POST()
179 36
            ->setRequestExtendedInfo($extendinfo);
180
181 36
        if ($this->settings()->isEnableHttpCompression()) {
182 27
            $new->httpCompression(true);
183
        }
184 36
        if ($this->settings()->getSessionId())
185
        {
186 1
            $new->persistent();
187
        }
188
189 36
        $new->timeOut($this->settings()->getTimeOut());
190 36
        $new->connectTimeOut($this->_connectTimeOut)->keepAlive(); // one sec
191 36
        $new->verbose(boolval($this->_verbose));
192
193 36
        return $new;
194
    }
195
196
    /**
197
     * @param Query $query
198
     * @param array $urlParams
199
     * @param bool $query_as_string
200
     * @return CurlerRequest
201
     * @throws \ClickHouseDB\Exception\TransportException
202
     */
203 36
    private function makeRequest(Query $query, $urlParams = [], $query_as_string = false)
204
    {
205 36
        $sql = $query->toSql();
206
207 36
        if ($query_as_string) {
208 1
            $urlParams['query'] = $sql;
209
        }
210
211 36
        $url = $this->getUrl($urlParams);
212
213
        $extendinfo = [
214 36
            'sql' => $sql,
215 36
            'query' => $query,
216 36
            'format'=> $query->getFormat()
217
        ];
218
219 36
        $new = $this->newRequest($extendinfo);
220 36
        $new->url($url);
221
222
223
224
225 36
        if (!$query_as_string) {
226 36
            $new->parameters_json($sql);
227
        }
228 36
        if ($this->settings()->isEnableHttpCompression()) {
229 27
            $new->httpCompression(true);
230
        }
231
232 36
        return $new;
233
    }
234
235
    /**
236
     * @param string $sql
237
     * @return CurlerRequest
238
     */
239 2
    public function writeStreamData($sql)
240
    {
241 2
        $query = new Query($sql);
242
243 2
        $url = $this->getUrl([
244 2
            'readonly' => 0,
245 2
            'query' => $query->toSql()
246
        ]);
247
248
        $extendinfo = [
249 2
            'sql' => $sql,
250 2
            'query' => $query,
251 2
            'format'=> $query->getFormat()
252
        ];
253
254 2
        $request = $this->newRequest($extendinfo);
255 2
        $request->url($url);
256 2
        return $request;
257
    }
258
259
260
    /**
261
     * @param string $sql
262
     * @param string $file_name
263
     * @return Statement
264
     * @throws \ClickHouseDB\Exception\TransportException
265
     */
266 7
    public function writeAsyncCSV($sql, $file_name)
267
    {
268 7
        $query = new Query($sql);
269
270 7
        $url = $this->getUrl([
271 7
            'readonly' => 0,
272 7
            'query' => $query->toSql()
273
        ]);
274
275
        $extendinfo = [
276 7
            'sql' => $sql,
277 7
            'query' => $query,
278 7
            'format'=> $query->getFormat()
279
        ];
280
281 7
        $request = $this->newRequest($extendinfo);
282 7
        $request->url($url);
283
284 7
        $request->setCallbackFunction(function(CurlerRequest $request) {
285 7
            fclose($request->getInfileHandle());
0 ignored issues
show
Bug introduced by
$request->getInfileHandle() of type boolean is incompatible with the type resource expected by parameter $handle of fclose(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

285
            fclose(/** @scrutinizer ignore-type */ $request->getInfileHandle());
Loading history...
286 7
        });
287
288 7
        $request->setInfile($file_name);
289 7
        $this->_curler->addQueLoop($request);
290
291 7
        return new Statement($request);
292
    }
293
294
    /**
295
     * get Count Pending Query in Queue
296
     *
297
     * @return int
298
     */
299 9
    public function getCountPendingQueue()
300
    {
301 9
        return $this->_curler->countPending();
302
    }
303
304
    /**
305
     * set Connect TimeOut in seconds [CURLOPT_CONNECTTIMEOUT] ( int )
306
     *
307
     * @param int $connectTimeOut
308
     */
309 2
    public function setConnectTimeOut($connectTimeOut)
310
    {
311 2
        $this->_connectTimeOut = $connectTimeOut;
312 2
    }
313
314
    /**
315
     * get ConnectTimeOut in seconds
316
     *
317
     * @return int
318
     */
319 1
    public function getConnectTimeOut()
320
    {
321 1
        return $this->_connectTimeOut;
322
    }
323
324
325 1
    public function __findXClickHouseProgress($handle)
326
    {
327 1
        $code = curl_getinfo($handle, CURLINFO_HTTP_CODE);
328
329
        // Search X-ClickHouse-Progress
330 1
        if ($code == 200) {
331 1
            $response = curl_multi_getcontent($handle);
332 1
            $header_size = curl_getinfo($handle, CURLINFO_HEADER_SIZE);
333 1
            if (!$header_size) {
334
                return false;
335
            }
336
337 1
            $header = substr($response, 0, $header_size);
338 1
            if (!$header_size) {
339
                return false;
340
            }
341 1
            $pos = strrpos($header, 'X-ClickHouse-Progress');
342
343 1
            if (!$pos) {
344
                return false;
345
            }
346
347 1
            $last = substr($header, $pos);
348 1
            $data = @json_decode(str_ireplace('X-ClickHouse-Progress:', '', $last), true);
349
350 1
            if ($data && is_callable($this->xClickHouseProgress)) {
351
352 1
                if (is_array($this->xClickHouseProgress)) {
353
                    call_user_func_array($this->xClickHouseProgress, [$data]);
354
                } else {
355 1
                    call_user_func($this->xClickHouseProgress, $data);
356
                }
357
358
359
            }
360
361
        }
362
363 1
    }
364
365
    /**
366
     * @param Query $query
367
     * @param null|WhereInFile $whereInFile
368
     * @param null|WriteToFile $writeToFile
369
     * @return CurlerRequest
370
     * @throws \Exception
371
     */
372 36
    public function getRequestRead(Query $query, $whereInFile = null, $writeToFile = null)
373
    {
374 36
        $urlParams = ['readonly' => 1];
375 36
        $query_as_string = false;
376
        // ---------------------------------------------------------------------------------
377 36
        if ($whereInFile instanceof WhereInFile && $whereInFile->size()) {
378
            // $request = $this->prepareSelectWhereIn($request, $whereInFile);
379 1
            $structure = $whereInFile->fetchUrlParams();
380
            // $structure = [];
381 1
            $urlParams = array_merge($urlParams, $structure);
382 1
            $query_as_string = true;
383
        }
384
        // ---------------------------------------------------------------------------------
385
        // if result to file
386 36
        if ($writeToFile instanceof WriteToFile && $writeToFile->fetchFormat()) {
387 1
            $query->setFormat($writeToFile->fetchFormat());
388 1
            unset($urlParams['extremes']);
389
        }
390
        // ---------------------------------------------------------------------------------
391
        // makeRequest read
392 36
        $request = $this->makeRequest($query, $urlParams, $query_as_string);
393
        // ---------------------------------------------------------------------------------
394
        // attach files
395 36
        if ($whereInFile instanceof WhereInFile && $whereInFile->size()) {
396 1
            $request->attachFiles($whereInFile->fetchFiles());
397
        }
398
        // ---------------------------------------------------------------------------------
399
        // result to file
400 36
        if ($writeToFile instanceof WriteToFile && $writeToFile->fetchFormat()) {
401
402 1
            $fout = fopen($writeToFile->fetchFile(), 'w');
403 1
            if (is_resource($fout)) {
404
405 1
                $isGz = $writeToFile->getGzip();
406
407 1
                if ($isGz) {
408
                    // write gzip header
409
                    // "\x1f\x8b\x08\x00\x00\x00\x00\x00"
410
                    // fwrite($fout, "\x1F\x8B\x08\x08".pack("V", time())."\0\xFF", 10);
411
                    // write the original file name
412
                    // $oname = str_replace("\0", "", basename($writeToFile->fetchFile()));
413
                    // fwrite($fout, $oname."\0", 1+strlen($oname));
414
415
                    fwrite($fout, "\x1f\x8b\x08\x00\x00\x00\x00\x00");
416
417
                }
418
419
420 1
                $request->setResultFileHandle($fout, $isGz)->setCallbackFunction(function(CurlerRequest $request) {
421
                    fclose($request->getResultFileHandle());
422 1
                });
423
            }
424
        }
425 36
        if ($this->xClickHouseProgress)
426
        {
427 1
            $request->setFunctionProgress([$this, '__findXClickHouseProgress']);
428
        }
429
        // ---------------------------------------------------------------------------------
430 36
        return $request;
431
432
    }
433
434 1
    public function cleanQueryDegeneration()
435
    {
436 1
        $this->_query_degenerations = [];
437 1
        return true;
438
    }
439
440 44
    public function addQueryDegeneration(Degeneration $degeneration)
441
    {
442 44
        $this->_query_degenerations[] = $degeneration;
443 44
        return true;
444
    }
445
446
    /**
447
     * @param Query $query
448
     * @return CurlerRequest
449
     * @throws \ClickHouseDB\Exception\TransportException
450
     */
451 18
    public function getRequestWrite(Query $query)
452
    {
453 18
        $urlParams = ['readonly' => 0];
454 18
        return $this->makeRequest($query, $urlParams);
455
    }
456
457
    /**
458
     * @param string $sql
459
     * @param array $bindings
460
     * @return Query
461
     */
462 36
    private function prepareQuery($sql, $bindings)
463
    {
464
465
        // add Degeneration query
466 36
        foreach ($this->_query_degenerations as $degeneration) {
467 36
            $degeneration->bindParams($bindings);
468
        }
469
470 36
        return new Query($sql, $this->_query_degenerations);
471
    }
472
473
474
    /**
475
     * @param Query|string $sql
476
     * @param array $bindings
477
     * @param null|WhereInFile $whereInFile
478
     * @param null|WriteToFile $writeToFile
479
     * @return CurlerRequest
480
     * @throws \Exception
481
     */
482 36
    private function prepareSelect($sql, $bindings, $whereInFile, $writeToFile = null)
483
    {
484 36
        if ($sql instanceof Query) {
485
            return $this->getRequestWrite($sql);
486
        }
487
488
489 36
        $query = $this->prepareQuery($sql, $bindings);
490 36
        $query->setFormat('JSON');
491 36
        return $this->getRequestRead($query, $whereInFile, $writeToFile);
492
493
    }
494
495
    /**
496
     * @param Query|string $sql
497
     * @param array $bindings
498
     * @return CurlerRequest
499
     * @throws \ClickHouseDB\Exception\TransportException
500
     */
501 19
    private function prepareWrite($sql, $bindings = [])
502
    {
503 19
        if ($sql instanceof Query) {
504
            return $this->getRequestWrite($sql);
505
        }
506
507 19
        $query = $this->prepareQuery($sql, $bindings);
508 18
        return $this->getRequestWrite($query);
509
    }
510
511
    /**
512
     * @return bool
513
     * @throws \ClickHouseDB\Exception\TransportException
514
     */
515 8
    public function executeAsync()
516
    {
517 8
        return $this->_curler->execLoopWait();
518
    }
519
520
    /**
521
     * @param Query|string $sql
522
     * @param array $bindings
523
     * @param null|WhereInFile $whereInFile
524
     * @param null|WriteToFile $writeToFile
525
     * @return Statement
526
     * @throws \ClickHouseDB\Exception\TransportException
527
     * @throws \Exception
528
     */
529 35
    public function select($sql, array $bindings = [], $whereInFile = null, $writeToFile = null)
530
    {
531 35
        $request = $this->prepareSelect($sql, $bindings, $whereInFile, $writeToFile);
532 35
        $this->_curler->execOne($request);
533 35
        return new Statement($request);
534
    }
535
536
    /**
537
     * @param Query|string $sql
538
     * @param array $bindings
539
     * @param null|WhereInFile $whereInFile
540
     * @param null|WriteToFile $writeToFile
541
     * @return Statement
542
     * @throws \ClickHouseDB\Exception\TransportException
543
     * @throws \Exception
544
     */
545 4
    public function selectAsync($sql, array $bindings = [], $whereInFile = null, $writeToFile = null)
546
    {
547 4
        $request = $this->prepareSelect($sql, $bindings, $whereInFile, $writeToFile);
548 4
        $this->_curler->addQueLoop($request);
549 4
        return new Statement($request);
550
    }
551
552
    /**
553
     * @param callable $callback
554
     */
555 1
    public function setProgressFunction(callable $callback)
556
    {
557 1
        $this->xClickHouseProgress = $callback;
558 1
    }
559
560
    /**
561
     * @param string $sql
562
     * @param array $bindings
563
     * @param bool $exception
564
     * @return Statement
565
     * @throws \ClickHouseDB\Exception\TransportException
566
     */
567 19
    public function write($sql, array $bindings = [], $exception = true)
568
    {
569 19
        $request = $this->prepareWrite($sql, $bindings);
570 18
        $this->_curler->execOne($request);
571 18
        $response = new Statement($request);
572 18
        if ($exception) {
573 18
            if ($response->isError()) {
574 3
                $response->error();
575
            }
576
        }
577 16
        return $response;
578
    }
579
}
580