1 | <?php |
||
2 | |||
3 | namespace ClickHouseDB\Transport; |
||
4 | |||
5 | use ClickHouseDB\Exception\TransportException; |
||
6 | use ClickHouseDB\Query\Degeneration; |
||
7 | use ClickHouseDB\Query\Query; |
||
8 | use ClickHouseDB\Query\WhereInFile; |
||
9 | use ClickHouseDB\Query\WriteToFile; |
||
10 | use ClickHouseDB\Settings; |
||
11 | use ClickHouseDB\Statement; |
||
12 | use const PHP_EOL; |
||
13 | |||
14 | class Http |
||
15 | { |
||
16 | /** |
||
17 | * @var string |
||
18 | */ |
||
19 | private $_username = null; |
||
20 | |||
21 | /** |
||
22 | * @var string |
||
23 | */ |
||
24 | private $_password = null; |
||
25 | |||
26 | /** |
||
27 | * @var string |
||
28 | */ |
||
29 | private $_host = ''; |
||
30 | |||
31 | /** |
||
32 | * @var int |
||
33 | */ |
||
34 | private $_port = 0; |
||
35 | |||
36 | /** |
||
37 | * @var bool|int |
||
38 | */ |
||
39 | private $_verbose = false; |
||
40 | |||
41 | /** |
||
42 | * @var CurlerRolling |
||
43 | */ |
||
44 | private $_curler = null; |
||
45 | |||
46 | /** |
||
47 | * @var Settings |
||
48 | */ |
||
49 | private $_settings = null; |
||
50 | |||
51 | /** |
||
52 | * @var array |
||
53 | */ |
||
54 | private $_query_degenerations = []; |
||
55 | |||
56 | /** |
||
57 | * Count seconds (int) |
||
58 | * |
||
59 | * @var int |
||
60 | */ |
||
61 | private $_connectTimeOut = 5; |
||
62 | |||
63 | /** |
||
64 | * @var callable |
||
65 | */ |
||
66 | private $xClickHouseProgress = null; |
||
67 | |||
68 | /** |
||
69 | * @var null|string |
||
70 | */ |
||
71 | private $sslCA = null; |
||
72 | |||
73 | /** |
||
74 | * Http constructor. |
||
75 | * @param string $host |
||
76 | * @param int $port |
||
77 | * @param string $username |
||
78 | * @param string $password |
||
79 | */ |
||
80 | 67 | public function __construct($host, $port, $username, $password) |
|
81 | { |
||
82 | 67 | $this->setHost($host, $port); |
|
83 | |||
84 | 67 | $this->_username = $username; |
|
85 | 67 | $this->_password = $password; |
|
86 | 67 | $this->_settings = new Settings($this); |
|
87 | |||
88 | 67 | $this->setCurler(); |
|
89 | 67 | } |
|
90 | |||
91 | |||
92 | 67 | public function setCurler() |
|
93 | { |
||
94 | 67 | $this->_curler = new CurlerRolling(); |
|
95 | 67 | } |
|
96 | |||
97 | /** |
||
98 | * @param CurlerRolling $curler |
||
99 | */ |
||
100 | public function setDirtyCurler(CurlerRolling $curler) |
||
101 | { |
||
102 | if ($curler instanceof CurlerRolling) { |
||
103 | $this->_curler = $curler; |
||
104 | } |
||
105 | } |
||
106 | |||
107 | /** |
||
108 | * @return CurlerRolling |
||
109 | */ |
||
110 | public function getCurler() |
||
111 | { |
||
112 | return $this->_curler; |
||
113 | } |
||
114 | |||
115 | /** |
||
116 | * @param string $host |
||
117 | * @param int $port |
||
118 | */ |
||
119 | 67 | public function setHost($host, $port = -1) |
|
120 | { |
||
121 | 67 | if ($port > 0) { |
|
122 | 67 | $this->_port = $port; |
|
123 | } |
||
124 | |||
125 | 67 | $this->_host = $host; |
|
126 | 67 | } |
|
127 | |||
128 | /** |
||
129 | * Sets client SSL certificate for Yandex Cloud |
||
130 | * |
||
131 | * @param string $caPath |
||
132 | */ |
||
133 | public function setSslCa($caPath) |
||
0 ignored issues
–
show
introduced
by
Loading history...
|
|||
134 | { |
||
135 | $this->sslCA = $caPath; |
||
136 | } |
||
137 | |||
138 | /** |
||
139 | * @return string |
||
140 | */ |
||
141 | 54 | public function getUri() |
|
142 | { |
||
143 | 54 | $proto = 'http'; |
|
144 | 54 | if ($this->settings()->isHttps()) { |
|
145 | 1 | $proto = 'https'; |
|
146 | } |
||
147 | 54 | $uri = $proto . '://' . $this->_host; |
|
148 | 54 | if (stripos($this->_host, '/') !== false || stripos($this->_host, ':') !== false) { |
|
149 | 1 | return $uri; |
|
150 | } |
||
151 | 54 | if (intval($this->_port) > 0) { |
|
152 | 54 | return $uri . ':' . $this->_port; |
|
153 | } |
||
154 | 1 | return $uri; |
|
155 | } |
||
156 | |||
157 | /** |
||
158 | * @return Settings |
||
159 | */ |
||
160 | 67 | public function settings() |
|
161 | { |
||
162 | 67 | return $this->_settings; |
|
163 | } |
||
164 | |||
165 | /** |
||
166 | * @param bool|int $flag |
||
167 | * @return mixed |
||
168 | */ |
||
169 | public function verbose($flag) |
||
170 | { |
||
171 | $this->_verbose = $flag; |
||
172 | return $flag; |
||
173 | } |
||
174 | |||
175 | /** |
||
176 | * @param array $params |
||
177 | * @return string |
||
178 | */ |
||
179 | 44 | private function getUrl($params = []) |
|
180 | { |
||
181 | 44 | $settings = $this->settings()->getSettings(); |
|
182 | |||
183 | 44 | if (is_array($params) && sizeof($params)) { |
|
184 | 44 | $settings = array_merge($settings, $params); |
|
185 | } |
||
186 | |||
187 | |||
188 | 44 | if ($this->settings()->isReadOnlyUser()) { |
|
189 | unset($settings['extremes']); |
||
190 | unset($settings['readonly']); |
||
191 | unset($settings['enable_http_compression']); |
||
192 | unset($settings['max_execution_time']); |
||
193 | |||
194 | } |
||
195 | |||
196 | 44 | unset($settings['https']); |
|
197 | |||
198 | |||
199 | 44 | return $this->getUri() . '?' . http_build_query($settings); |
|
200 | } |
||
201 | |||
202 | /** |
||
203 | * @param array $extendinfo |
||
204 | * @return CurlerRequest |
||
205 | */ |
||
206 | 44 | private function newRequest($extendinfo) |
|
207 | { |
||
208 | 44 | $new = new CurlerRequest(); |
|
209 | 44 | $new->authByHeaders($this->_username, $this->_password) |
|
210 | 44 | ->POST() |
|
211 | 44 | ->setRequestExtendedInfo($extendinfo); |
|
212 | |||
213 | 44 | if ($this->settings()->isEnableHttpCompression()) { |
|
214 | 44 | $new->httpCompression(true); |
|
215 | } |
||
216 | 44 | if ($this->settings()->getSessionId()) { |
|
217 | 1 | $new->persistent(); |
|
218 | } |
||
219 | 44 | if ($this->sslCA) { |
|
220 | $new->setSslCa($this->sslCA); |
||
221 | } |
||
222 | |||
223 | 44 | $new->timeOut($this->settings()->getTimeOut()); |
|
224 | 44 | $new->connectTimeOut($this->_connectTimeOut);//->keepAlive(); // one sec |
|
225 | 44 | $new->verbose(boolval($this->_verbose)); |
|
226 | |||
227 | 44 | return $new; |
|
228 | } |
||
229 | |||
230 | /** |
||
231 | * @param Query $query |
||
232 | * @param array $urlParams |
||
233 | * @param bool $query_as_string |
||
234 | * @return CurlerRequest |
||
235 | * @throws \ClickHouseDB\Exception\TransportException |
||
236 | */ |
||
237 | 44 | private function makeRequest(Query $query, $urlParams = [], $query_as_string = false) |
|
238 | { |
||
239 | 44 | $sql = $query->toSql(); |
|
240 | |||
241 | 44 | if ($query_as_string) { |
|
242 | 1 | $urlParams['query'] = $sql; |
|
243 | } |
||
244 | |||
245 | 44 | $url = $this->getUrl($urlParams); |
|
246 | |||
247 | $extendinfo = [ |
||
248 | 44 | 'sql' => $sql, |
|
249 | 44 | 'query' => $query, |
|
250 | 44 | 'format' => $query->getFormat() |
|
251 | ]; |
||
252 | |||
253 | 44 | $new = $this->newRequest($extendinfo); |
|
254 | 44 | $new->url($url); |
|
255 | |||
256 | |||
257 | 44 | if (!$query_as_string) { |
|
258 | 44 | $new->parameters_json($sql); |
|
259 | } |
||
260 | 44 | if ($this->settings()->isEnableHttpCompression()) { |
|
261 | 44 | $new->httpCompression(true); |
|
262 | } |
||
263 | |||
264 | 44 | return $new; |
|
265 | } |
||
266 | |||
267 | /** |
||
268 | * @param string|Query $sql |
||
269 | * @return CurlerRequest |
||
270 | */ |
||
271 | 3 | public function writeStreamData($sql) |
|
272 | { |
||
273 | |||
274 | 3 | if ($sql instanceof Query) { |
|
275 | 1 | $query = $sql; |
|
276 | } else { |
||
277 | 2 | $query = new Query($sql); |
|
278 | } |
||
279 | |||
280 | 3 | $url = $this->getUrl([ |
|
281 | 3 | 'readonly' => 0, |
|
282 | 3 | 'query' => $query->toSql() |
|
283 | ]); |
||
284 | $extendinfo = [ |
||
285 | 3 | 'sql' => $sql, |
|
286 | 3 | 'query' => $query, |
|
287 | 3 | 'format' => $query->getFormat() |
|
288 | ]; |
||
289 | |||
290 | 3 | $request = $this->newRequest($extendinfo); |
|
291 | 3 | $request->url($url); |
|
292 | 3 | return $request; |
|
293 | } |
||
294 | |||
295 | |||
296 | /** |
||
297 | * @param string $sql |
||
298 | * @param string $file_name |
||
299 | * @return Statement |
||
300 | * @throws \ClickHouseDB\Exception\TransportException |
||
301 | */ |
||
302 | 8 | public function writeAsyncCSV($sql, $file_name) |
|
303 | { |
||
304 | 8 | $query = new Query($sql); |
|
305 | |||
306 | 8 | $url = $this->getUrl([ |
|
307 | 8 | 'readonly' => 0, |
|
308 | 8 | 'query' => $query->toSql() |
|
309 | ]); |
||
310 | |||
311 | $extendinfo = [ |
||
312 | 8 | 'sql' => $sql, |
|
313 | 8 | 'query' => $query, |
|
314 | 8 | 'format' => $query->getFormat() |
|
315 | ]; |
||
316 | |||
317 | 8 | $request = $this->newRequest($extendinfo); |
|
318 | 8 | $request->url($url); |
|
319 | |||
320 | $request->setCallbackFunction(function (CurlerRequest $request) { |
||
321 | 8 | $handle = $request->getInfileHandle(); |
|
322 | 8 | if (is_resource($handle)) { |
|
323 | 8 | fclose($handle); |
|
324 | } |
||
325 | 8 | }); |
|
326 | |||
327 | 8 | $request->setInfile($file_name); |
|
328 | 8 | $this->_curler->addQueLoop($request); |
|
329 | |||
330 | 8 | return new Statement($request); |
|
331 | } |
||
332 | |||
333 | /** |
||
334 | * get Count Pending Query in Queue |
||
335 | * |
||
336 | * @return int |
||
337 | */ |
||
338 | 12 | public function getCountPendingQueue() |
|
339 | { |
||
340 | 12 | return $this->_curler->countPending(); |
|
341 | } |
||
342 | |||
343 | /** |
||
344 | * set Connect TimeOut in seconds [CURLOPT_CONNECTTIMEOUT] ( int ) |
||
345 | * |
||
346 | * @param int $connectTimeOut |
||
347 | */ |
||
348 | 2 | public function setConnectTimeOut($connectTimeOut) |
|
349 | { |
||
350 | 2 | $this->_connectTimeOut = $connectTimeOut; |
|
351 | 2 | } |
|
352 | |||
353 | /** |
||
354 | * get ConnectTimeOut in seconds |
||
355 | * |
||
356 | * @return int |
||
357 | */ |
||
358 | 37 | public function getConnectTimeOut() |
|
359 | { |
||
360 | 37 | return $this->_connectTimeOut; |
|
361 | } |
||
362 | |||
363 | |||
364 | 1 | public function __findXClickHouseProgress($handle) |
|
365 | { |
||
366 | 1 | $code = curl_getinfo($handle, CURLINFO_HTTP_CODE); |
|
367 | |||
368 | // Search X-ClickHouse-Progress |
||
369 | 1 | if ($code == 200) { |
|
370 | 1 | $response = curl_multi_getcontent($handle); |
|
371 | 1 | $header_size = curl_getinfo($handle, CURLINFO_HEADER_SIZE); |
|
372 | 1 | if (!$header_size) { |
|
373 | return false; |
||
374 | } |
||
375 | |||
376 | 1 | $header = substr($response, 0, $header_size); |
|
377 | 1 | if (!$header_size) { |
|
378 | return false; |
||
379 | } |
||
380 | 1 | $pos = strrpos($header, 'X-ClickHouse-Progress'); |
|
381 | |||
382 | 1 | if (!$pos) { |
|
383 | return false; |
||
384 | } |
||
385 | |||
386 | 1 | $last = substr($header, $pos); |
|
387 | 1 | $data = @json_decode(str_ireplace('X-ClickHouse-Progress:', '', $last), true); |
|
388 | |||
389 | 1 | if ($data && is_callable($this->xClickHouseProgress)) { |
|
390 | |||
391 | 1 | if (is_array($this->xClickHouseProgress)) { |
|
392 | call_user_func_array($this->xClickHouseProgress, [$data]); |
||
393 | } else { |
||
394 | 1 | call_user_func($this->xClickHouseProgress, $data); |
|
395 | } |
||
396 | |||
397 | |||
398 | } |
||
399 | |||
400 | } |
||
401 | |||
402 | 1 | } |
|
403 | |||
404 | /** |
||
405 | * @param Query $query |
||
406 | * @param null|WhereInFile $whereInFile |
||
407 | * @param null|WriteToFile $writeToFile |
||
408 | * @return CurlerRequest |
||
409 | * @throws \Exception |
||
410 | */ |
||
411 | 39 | public function getRequestRead(Query $query, $whereInFile = null, $writeToFile = null) |
|
412 | { |
||
413 | 39 | $urlParams = ['readonly' => 2]; |
|
414 | 39 | $query_as_string = false; |
|
415 | // --------------------------------------------------------------------------------- |
||
416 | 39 | if ($whereInFile instanceof WhereInFile && $whereInFile->size()) { |
|
417 | // $request = $this->prepareSelectWhereIn($request, $whereInFile); |
||
418 | 1 | $structure = $whereInFile->fetchUrlParams(); |
|
419 | // $structure = []; |
||
420 | 1 | $urlParams = array_merge($urlParams, $structure); |
|
421 | 1 | $query_as_string = true; |
|
422 | } |
||
423 | // --------------------------------------------------------------------------------- |
||
424 | // if result to file |
||
425 | 39 | if ($writeToFile instanceof WriteToFile && $writeToFile->fetchFormat()) { |
|
426 | 1 | $query->setFormat($writeToFile->fetchFormat()); |
|
427 | 1 | unset($urlParams['extremes']); |
|
428 | } |
||
429 | // --------------------------------------------------------------------------------- |
||
430 | // makeRequest read |
||
431 | 39 | $request = $this->makeRequest($query, $urlParams, $query_as_string); |
|
432 | // --------------------------------------------------------------------------------- |
||
433 | // attach files |
||
434 | 39 | if ($whereInFile instanceof WhereInFile && $whereInFile->size()) { |
|
435 | 1 | $request->attachFiles($whereInFile->fetchFiles()); |
|
436 | } |
||
437 | // --------------------------------------------------------------------------------- |
||
438 | // result to file |
||
439 | 39 | if ($writeToFile instanceof WriteToFile && $writeToFile->fetchFormat()) { |
|
440 | |||
441 | 1 | $fout = fopen($writeToFile->fetchFile(), 'w'); |
|
442 | 1 | if (is_resource($fout)) { |
|
443 | |||
444 | 1 | $isGz = $writeToFile->getGzip(); |
|
445 | |||
446 | 1 | if ($isGz) { |
|
447 | // write gzip header |
||
448 | // "\x1f\x8b\x08\x00\x00\x00\x00\x00" |
||
449 | // fwrite($fout, "\x1F\x8B\x08\x08".pack("V", time())."\0\xFF", 10); |
||
450 | // write the original file name |
||
451 | // $oname = str_replace("\0", "", basename($writeToFile->fetchFile())); |
||
452 | // fwrite($fout, $oname."\0", 1+strlen($oname)); |
||
453 | |||
454 | fwrite($fout, "\x1f\x8b\x08\x00\x00\x00\x00\x00"); |
||
455 | |||
456 | } |
||
457 | |||
458 | |||
459 | $request->setResultFileHandle($fout, $isGz)->setCallbackFunction(function (CurlerRequest $request) { |
||
460 | fclose($request->getResultFileHandle()); |
||
461 | 1 | }); |
|
462 | } |
||
463 | } |
||
464 | 39 | if ($this->xClickHouseProgress) { |
|
465 | 1 | $request->setFunctionProgress([$this, '__findXClickHouseProgress']); |
|
466 | } |
||
467 | // --------------------------------------------------------------------------------- |
||
468 | 39 | return $request; |
|
469 | |||
470 | } |
||
471 | |||
472 | 1 | public function cleanQueryDegeneration() |
|
473 | { |
||
474 | 1 | $this->_query_degenerations = []; |
|
475 | 1 | return true; |
|
476 | } |
||
477 | |||
478 | 67 | public function addQueryDegeneration(Degeneration $degeneration) |
|
479 | { |
||
480 | 67 | $this->_query_degenerations[] = $degeneration; |
|
481 | 67 | return true; |
|
482 | } |
||
483 | |||
484 | /** |
||
485 | * @param Query $query |
||
486 | * @return CurlerRequest |
||
487 | * @throws \ClickHouseDB\Exception\TransportException |
||
488 | */ |
||
489 | 26 | public function getRequestWrite(Query $query) |
|
490 | { |
||
491 | 26 | $urlParams = ['readonly' => 0]; |
|
492 | 26 | return $this->makeRequest($query, $urlParams); |
|
493 | } |
||
494 | |||
495 | /** |
||
496 | * @throws TransportException |
||
497 | */ |
||
498 | 37 | public function ping(): bool |
|
499 | { |
||
500 | 37 | $request = new CurlerRequest(); |
|
501 | 37 | $request->url($this->getUri())->verbose(false)->GET()->connectTimeOut($this->getConnectTimeOut()); |
|
502 | 37 | $this->_curler->execOne($request); |
|
503 | |||
504 | 37 | return $request->response()->body() === 'Ok.' . PHP_EOL; |
|
505 | } |
||
506 | |||
507 | /** |
||
508 | * @param string $sql |
||
509 | * @param mixed[] $bindings |
||
510 | * @return Query |
||
511 | */ |
||
512 | 45 | private function prepareQuery($sql, $bindings) |
|
513 | { |
||
514 | |||
515 | // add Degeneration query |
||
516 | 45 | foreach ($this->_query_degenerations as $degeneration) { |
|
517 | 45 | $degeneration->bindParams($bindings); |
|
518 | } |
||
519 | |||
520 | 45 | return new Query($sql, $this->_query_degenerations); |
|
521 | } |
||
522 | |||
523 | |||
524 | /** |
||
525 | * @param Query|string $sql |
||
526 | * @param mixed[] $bindings |
||
527 | * @param null|WhereInFile $whereInFile |
||
528 | * @param null|WriteToFile $writeToFile |
||
529 | * @return CurlerRequest |
||
530 | * @throws \Exception |
||
531 | */ |
||
532 | 38 | private function prepareSelect($sql, $bindings, $whereInFile, $writeToFile = null) |
|
533 | { |
||
534 | 38 | if ($sql instanceof Query) { |
|
535 | return $this->getRequestWrite($sql); |
||
536 | } |
||
537 | 38 | $query = $this->prepareQuery($sql, $bindings); |
|
538 | 38 | $query->setFormat('JSON'); |
|
539 | 38 | return $this->getRequestRead($query, $whereInFile, $writeToFile); |
|
540 | } |
||
541 | |||
542 | |||
543 | /** |
||
544 | * @param Query|string $sql |
||
545 | * @param mixed[] $bindings |
||
546 | * @return CurlerRequest |
||
547 | * @throws \ClickHouseDB\Exception\TransportException |
||
548 | */ |
||
549 | 27 | private function prepareWrite($sql, $bindings = []) |
|
550 | { |
||
551 | 27 | if ($sql instanceof Query) { |
|
552 | return $this->getRequestWrite($sql); |
||
553 | } |
||
554 | |||
555 | 27 | $query = $this->prepareQuery($sql, $bindings); |
|
556 | 26 | return $this->getRequestWrite($query); |
|
557 | } |
||
558 | |||
559 | /** |
||
560 | * @return bool |
||
561 | * @throws \ClickHouseDB\Exception\TransportException |
||
562 | */ |
||
563 | 10 | public function executeAsync() |
|
564 | { |
||
565 | 10 | return $this->_curler->execLoopWait(); |
|
566 | } |
||
567 | |||
568 | /** |
||
569 | * @param Query|string $sql |
||
570 | * @param mixed[] $bindings |
||
571 | * @param null|WhereInFile $whereInFile |
||
572 | * @param null|WriteToFile $writeToFile |
||
573 | * @return Statement |
||
574 | * @throws \ClickHouseDB\Exception\TransportException |
||
575 | * @throws \Exception |
||
576 | */ |
||
577 | 31 | public function select($sql, array $bindings = [], $whereInFile = null, $writeToFile = null) |
|
578 | { |
||
579 | 31 | $request = $this->prepareSelect($sql, $bindings, $whereInFile, $writeToFile); |
|
580 | 31 | $this->_curler->execOne($request); |
|
581 | 31 | return new Statement($request); |
|
582 | } |
||
583 | |||
584 | /** |
||
585 | * @param Query|string $sql |
||
586 | * @param mixed[] $bindings |
||
587 | * @param null|WhereInFile $whereInFile |
||
588 | * @param null|WriteToFile $writeToFile |
||
589 | * @return Statement |
||
590 | * @throws \ClickHouseDB\Exception\TransportException |
||
591 | * @throws \Exception |
||
592 | */ |
||
593 | 7 | public function selectAsync($sql, array $bindings = [], $whereInFile = null, $writeToFile = null) |
|
594 | { |
||
595 | 7 | $request = $this->prepareSelect($sql, $bindings, $whereInFile, $writeToFile); |
|
596 | 7 | $this->_curler->addQueLoop($request); |
|
597 | 7 | return new Statement($request); |
|
598 | } |
||
599 | |||
600 | /** |
||
601 | * @param callable $callback |
||
602 | */ |
||
603 | 1 | public function setProgressFunction(callable $callback) |
|
604 | { |
||
605 | 1 | $this->xClickHouseProgress = $callback; |
|
606 | 1 | } |
|
607 | |||
608 | /** |
||
609 | * @param string $sql |
||
610 | * @param mixed[] $bindings |
||
611 | * @param bool $exception |
||
612 | * @return Statement |
||
613 | * @throws \ClickHouseDB\Exception\TransportException |
||
614 | */ |
||
615 | 27 | public function write($sql, array $bindings = [], $exception = true) |
|
616 | { |
||
617 | 27 | $request = $this->prepareWrite($sql, $bindings); |
|
618 | 26 | $this->_curler->execOne($request); |
|
619 | 26 | $response = new Statement($request); |
|
620 | 26 | if ($exception) { |
|
621 | 26 | if ($response->isError()) { |
|
622 | 3 | $response->error(); |
|
623 | } |
||
624 | } |
||
625 | 24 | return $response; |
|
626 | } |
||
627 | |||
628 | /** |
||
629 | * @param Stream $streamRW |
||
630 | * @param CurlerRequest $request |
||
631 | * @return Statement |
||
632 | * @throws \ClickHouseDB\Exception\TransportException |
||
633 | */ |
||
634 | 2 | private function streaming(Stream $streamRW, CurlerRequest $request) |
|
635 | { |
||
636 | 2 | $callable = $streamRW->getClosure(); |
|
637 | 2 | $stream = $streamRW->getStream(); |
|
638 | |||
639 | |||
640 | try { |
||
641 | |||
642 | |||
643 | 2 | if (!is_callable($callable)) { |
|
644 | if ($streamRW->isWrite()) { |
||
645 | |||
646 | $callable = function ($ch, $fd, $length) use ($stream) { |
||
647 | return ($line = fread($stream, $length)) ? $line : ''; |
||
648 | }; |
||
649 | } else { |
||
650 | $callable = function ($ch, $fd) use ($stream) { |
||
651 | return fwrite($stream, $fd); |
||
652 | }; |
||
653 | } |
||
654 | } |
||
655 | |||
656 | 2 | if ($streamRW->isGzipHeader()) { |
|
657 | |||
658 | 1 | if ($streamRW->isWrite()) { |
|
659 | 1 | $request->header('Content-Encoding', 'gzip'); |
|
660 | 1 | $request->header('Content-Type', 'application/x-www-form-urlencoded'); |
|
661 | } else { |
||
662 | $request->header('Accept-Encoding', 'gzip'); |
||
663 | } |
||
664 | |||
665 | } |
||
666 | |||
667 | |||
668 | 2 | $request->header('Transfer-Encoding', 'chunked'); |
|
669 | |||
670 | |||
671 | 2 | if ($streamRW->isWrite()) { |
|
672 | 1 | $request->setReadFunction($callable); |
|
673 | } else { |
||
674 | 1 | $request->setWriteFunction($callable); |
|
675 | |||
676 | |||
677 | // $request->setHeaderFunction($callableHead); |
||
678 | } |
||
679 | |||
680 | |||
681 | 2 | $this->_curler->execOne($request, true); |
|
682 | 2 | $response = new Statement($request); |
|
683 | 2 | if ($response->isError()) { |
|
684 | $response->error(); |
||
685 | } |
||
686 | 2 | return $response; |
|
687 | } finally { |
||
688 | 2 | if ($streamRW->isWrite()) |
|
689 | 2 | fclose($stream); |
|
690 | } |
||
691 | |||
692 | |||
693 | } |
||
694 | |||
695 | |||
696 | /** |
||
697 | * @param Stream $streamRead |
||
698 | * @param string $sql |
||
699 | * @param mixed[] $bindings |
||
700 | * @return Statement |
||
701 | * @throws \ClickHouseDB\Exception\TransportException |
||
702 | */ |
||
703 | 1 | public function streamRead(Stream $streamRead, $sql, $bindings = []) |
|
704 | { |
||
705 | 1 | $sql = $this->prepareQuery($sql, $bindings); |
|
706 | 1 | $request = $this->getRequestRead($sql); |
|
707 | 1 | return $this->streaming($streamRead, $request); |
|
708 | |||
709 | } |
||
710 | |||
711 | /** |
||
712 | * @param Stream $streamWrite |
||
713 | * @param string $sql |
||
714 | * @param mixed[] $bindings |
||
715 | * @return Statement |
||
716 | * @throws \ClickHouseDB\Exception\TransportException |
||
717 | */ |
||
718 | 1 | public function streamWrite(Stream $streamWrite, $sql, $bindings = []) |
|
719 | { |
||
720 | 1 | $sql = $this->prepareQuery($sql, $bindings); |
|
721 | 1 | $request = $this->writeStreamData($sql); |
|
722 | 1 | return $this->streaming($streamWrite, $request); |
|
723 | } |
||
724 | } |
||
725 |