1 | <?php |
||||||
2 | |||||||
3 | declare(strict_types=1); |
||||||
4 | |||||||
5 | namespace ClickHouseDB; |
||||||
6 | |||||||
7 | use ClickHouseDB\Exception\QueryException; |
||||||
8 | use ClickHouseDB\Query\Degeneration; |
||||||
9 | use ClickHouseDB\Query\Degeneration\Bindings; |
||||||
10 | use ClickHouseDB\Query\Degeneration\Conditions; |
||||||
11 | use ClickHouseDB\Query\WhereInFile; |
||||||
12 | use ClickHouseDB\Query\WriteToFile; |
||||||
13 | use ClickHouseDB\Quote\FormatLine; |
||||||
14 | use ClickHouseDB\Transport\Http; |
||||||
15 | use ClickHouseDB\Transport\Stream; |
||||||
16 | use function array_flip; |
||||||
17 | use function array_keys; |
||||||
18 | use function array_rand; |
||||||
19 | use function array_values; |
||||||
20 | use function count; |
||||||
21 | use function date; |
||||||
22 | use function implode; |
||||||
23 | use function in_array; |
||||||
24 | use function is_array; |
||||||
25 | use function is_callable; |
||||||
26 | use function is_file; |
||||||
27 | use function is_readable; |
||||||
28 | use function is_string; |
||||||
29 | use function sprintf; |
||||||
30 | use function stripos; |
||||||
31 | use function strtotime; |
||||||
32 | use function trim; |
||||||
33 | |||||||
34 | class Client |
||||||
35 | { |
||||||
36 | private const DEFAULT_USERNAME = 'default'; |
||||||
37 | private const DEFAULT_PASSWORD = ''; |
||||||
38 | private const DEFAULT_PORT = 8123; |
||||||
39 | private const DEFAULT_HOST = '127.0.0.1'; |
||||||
40 | private const DEFAULT_DATABASE = 'default'; |
||||||
41 | private const SUPPORTED_FORMATS = ['TabSeparated', 'TabSeparatedWithNames', 'CSV', 'CSVWithNames', 'JSONEachRow']; |
||||||
42 | |||||||
43 | /** @var Http */ |
||||||
44 | private $transport; |
||||||
45 | |||||||
46 | /** @var string */ |
||||||
47 | private $username; |
||||||
48 | |||||||
49 | /** @var string */ |
||||||
50 | private $password; |
||||||
51 | |||||||
52 | /** @var string */ |
||||||
53 | private $host; |
||||||
54 | |||||||
55 | /** @var string */ |
||||||
56 | private $port; |
||||||
57 | |||||||
58 | /** @var string */ |
||||||
59 | private $database; |
||||||
60 | |||||||
61 | /** |
||||||
62 | * @param mixed[] $connectParams |
||||||
63 | * @param mixed[] $settings |
||||||
64 | */ |
||||||
65 | 63 | public function __construct(array $connectParams, array $settings = []) |
|||||
66 | { |
||||||
67 | 63 | $this->username = $connectParams['username'] ?? self::DEFAULT_USERNAME; |
|||||
68 | 63 | $this->password = $connectParams['password'] ?? self::DEFAULT_PASSWORD; |
|||||
69 | 63 | $this->port = $connectParams['port'] ?? self::DEFAULT_PORT; |
|||||
70 | 63 | $this->host = $connectParams['host'] ?? self::DEFAULT_HOST; |
|||||
71 | 63 | $this->database = $connectParams['database'] ?? self::DEFAULT_DATABASE; |
|||||
72 | |||||||
73 | 63 | $this->transport = new Http( |
|||||
74 | 63 | $this->host, |
|||||
75 | 63 | $this->port, |
|||||
76 | 63 | $this->username, |
|||||
77 | 63 | $this->password, |
|||||
78 | 63 | $this->database |
|||||
79 | ); |
||||||
80 | |||||||
81 | 63 | $this->transport->addQueryDegeneration(new Bindings()); |
|||||
82 | |||||||
83 | // apply settings to transport class |
||||||
84 | 63 | if (! empty($settings)) { |
|||||
85 | 1 | $this->getSettings()->apply($settings); |
|||||
86 | } |
||||||
87 | |||||||
88 | 63 | if (isset($connectParams['https'])) { |
|||||
89 | $this->setHttps($connectParams['https']); |
||||||
90 | } |
||||||
91 | |||||||
92 | 63 | $this->setHttpCompression(true); |
|||||
93 | 63 | } |
|||||
94 | |||||||
95 | /** |
||||||
96 | * Clear Degeneration processing request [template ] |
||||||
97 | */ |
||||||
98 | 1 | public function cleanQueryDegeneration() : void |
|||||
99 | { |
||||||
100 | 1 | $this->transport->cleanQueryDegeneration(); |
|||||
101 | 1 | } |
|||||
102 | |||||||
103 | /** |
||||||
104 | * Degeneration processing |
||||||
105 | */ |
||||||
106 | public function addQueryDegeneration(Degeneration $degeneration) : void |
||||||
107 | { |
||||||
108 | $this->transport->addQueryDegeneration($degeneration); |
||||||
109 | } |
||||||
110 | |||||||
111 | /** |
||||||
112 | * Add Conditions Degeneration to query processing |
||||||
113 | */ |
||||||
114 | 1 | public function enableQueryConditions() : void |
|||||
115 | { |
||||||
116 | 1 | $this->transport->addQueryDegeneration(new Conditions()); |
|||||
117 | 1 | } |
|||||
118 | |||||||
119 | 2 | public function setTimeout(float $seconds) : void |
|||||
120 | { |
||||||
121 | 2 | $this->transport->setTimeout($seconds); |
|||||
122 | 2 | } |
|||||
123 | |||||||
124 | 2 | public function setConnectTimeout(float $seconds) : void |
|||||
125 | { |
||||||
126 | 2 | $this->transport->setConnectTimeout($seconds); |
|||||
127 | 2 | } |
|||||
128 | |||||||
129 | /** |
||||||
130 | * @return string |
||||||
131 | */ |
||||||
132 | public function getHost() |
||||||
133 | { |
||||||
134 | return $this->host; |
||||||
135 | } |
||||||
136 | |||||||
137 | /** |
||||||
138 | * Set connection host |
||||||
139 | * |
||||||
140 | * @param string|string[] $host |
||||||
141 | */ |
||||||
142 | public function setHost($host) |
||||||
143 | { |
||||||
144 | if (is_array($host)) { |
||||||
145 | $host = array_rand(array_flip($host)); |
||||||
146 | } |
||||||
147 | |||||||
148 | $this->host = $host; |
||||||
149 | $this->transport->setHost($host); |
||||||
150 | } |
||||||
151 | |||||||
152 | public function getPassword() : string |
||||||
153 | { |
||||||
154 | return $this->password; |
||||||
155 | } |
||||||
156 | |||||||
157 | public function getPort() : int |
||||||
158 | { |
||||||
159 | return $this->port; |
||||||
0 ignored issues
–
show
Bug
Best Practice
introduced
by
![]() |
|||||||
160 | } |
||||||
161 | |||||||
162 | public function getUsername() : string |
||||||
163 | { |
||||||
164 | return $this->username; |
||||||
165 | } |
||||||
166 | |||||||
167 | 63 | public function setDatabase(string $database) : self |
|||||
168 | { |
||||||
169 | 63 | $this->database = $database; |
|||||
170 | 63 | $this->transport->setDatabase($database); |
|||||
171 | |||||||
172 | 63 | return $this; |
|||||
173 | } |
||||||
174 | |||||||
175 | 2 | public function getTransport() : Http |
|||||
176 | { |
||||||
177 | 2 | return $this->transport; |
|||||
178 | } |
||||||
179 | |||||||
180 | /** |
||||||
181 | * @return mixed |
||||||
182 | */ |
||||||
183 | public function verbose() |
||||||
184 | { |
||||||
185 | return $this->transport->setVerbose(); |
||||||
186 | } |
||||||
187 | |||||||
188 | 63 | public function getSettings() : Settings |
|||||
189 | { |
||||||
190 | 63 | return $this->transport->getSettings(); |
|||||
191 | } |
||||||
192 | |||||||
193 | /** |
||||||
194 | * @return static |
||||||
195 | */ |
||||||
196 | 2 | public function useSession(bool $useSessionId = false) |
|||||
197 | { |
||||||
198 | 2 | if (! $this->getSettings()->getSessionId()) { |
|||||
199 | 2 | if (! $useSessionId) { |
|||||
200 | 2 | $this->getSettings()->makeSessionId(); |
|||||
201 | } else { |
||||||
202 | $this->getSettings()->session_id($useSessionId); |
||||||
203 | } |
||||||
204 | } |
||||||
205 | |||||||
206 | 2 | return $this; |
|||||
207 | } |
||||||
208 | |||||||
209 | /** |
||||||
210 | * @return mixed |
||||||
211 | */ |
||||||
212 | 2 | public function getSession() |
|||||
213 | { |
||||||
214 | 2 | return $this->getSettings()->getSessionId(); |
|||||
215 | } |
||||||
216 | |||||||
217 | /** |
||||||
218 | * Query CREATE/DROP |
||||||
219 | * |
||||||
220 | * @param mixed[] $bindings |
||||||
221 | * @return Statement |
||||||
222 | */ |
||||||
223 | 63 | public function write(string $sql, array $bindings = [], bool $exception = true) |
|||||
224 | { |
||||||
225 | 63 | return $this->transport->write($sql, $bindings, $exception); |
|||||
226 | } |
||||||
227 | |||||||
228 | /** |
||||||
229 | * Write to system.query_log |
||||||
230 | * |
||||||
231 | * @return static |
||||||
232 | */ |
||||||
233 | public function enableLogQueries(bool $flag = true) |
||||||
234 | { |
||||||
235 | $this->getSettings()->set('log_queries', (int) $flag); |
||||||
236 | |||||||
237 | return $this; |
||||||
238 | } |
||||||
239 | |||||||
240 | /** |
||||||
241 | * Compress the result if the HTTP client said that it understands data compressed with gzip or deflate |
||||||
242 | */ |
||||||
243 | 63 | public function setHttpCompression(bool $enable) : self |
|||||
244 | { |
||||||
245 | 63 | $this->getSettings()->setHttpCompression($enable); |
|||||
246 | |||||||
247 | 63 | return $this; |
|||||
248 | } |
||||||
249 | |||||||
250 | /** |
||||||
251 | * Enable / Disable HTTPS |
||||||
252 | * |
||||||
253 | * @return static |
||||||
254 | */ |
||||||
255 | 1 | public function setHttps(bool $flag) |
|||||
256 | { |
||||||
257 | 1 | $this->transport->setHttps($flag); |
|||||
258 | |||||||
259 | 1 | return $this; |
|||||
260 | } |
||||||
261 | |||||||
262 | /** |
||||||
263 | * Read extremes of the result columns. They can be output in JSON-formats. |
||||||
264 | * |
||||||
265 | * @return static |
||||||
266 | */ |
||||||
267 | 2 | public function enableExtremes(bool $flag = true) |
|||||
268 | { |
||||||
269 | 2 | $this->getSettings()->set('extremes', (int) $flag); |
|||||
270 | |||||||
271 | 2 | return $this; |
|||||
272 | } |
||||||
273 | |||||||
274 | /** |
||||||
275 | * Ping server |
||||||
276 | * |
||||||
277 | * @return bool |
||||||
278 | */ |
||||||
279 | 39 | public function ping() |
|||||
280 | { |
||||||
281 | 39 | return $this->transport->ping(); |
|||||
282 | } |
||||||
283 | |||||||
284 | /** |
||||||
285 | * @param mixed[] $bindings |
||||||
286 | * @return Statement |
||||||
287 | */ |
||||||
288 | 29 | public function select( |
|||||
289 | string $sql, |
||||||
290 | array $bindings = [], |
||||||
291 | WhereInFile $whereInFile = null, |
||||||
292 | WriteToFile $writeToFile = null |
||||||
293 | ) { |
||||||
294 | 29 | return $this->transport->select($sql, $bindings, $whereInFile, $writeToFile); |
|||||
295 | } |
||||||
296 | |||||||
297 | /** |
||||||
298 | * prepare select |
||||||
299 | * |
||||||
300 | * @param mixed[] $bindings |
||||||
301 | * @return Statement |
||||||
302 | */ |
||||||
303 | 5 | public function selectAsync( |
|||||
304 | string $sql, |
||||||
305 | array $bindings = [], |
||||||
306 | WhereInFile $whereInFile = null, |
||||||
307 | WriteToFile $writeToFile = null |
||||||
308 | ) { |
||||||
309 | 5 | return $this->transport->selectAsync($sql, $bindings, $whereInFile, $writeToFile); |
|||||
310 | } |
||||||
311 | |||||||
312 | /** |
||||||
313 | * @return bool |
||||||
314 | */ |
||||||
315 | 10 | public function executeAsync() |
|||||
316 | { |
||||||
317 | 10 | return $this->transport->executeAsync(); |
|||||
318 | } |
||||||
319 | |||||||
320 | /** |
||||||
321 | * set progressFunction |
||||||
322 | */ |
||||||
323 | 1 | public function progressFunction(callable $callback) |
|||||
324 | { |
||||||
325 | 1 | if (! is_callable($callback)) { |
|||||
326 | throw new \InvalidArgumentException('Not is_callable progressFunction'); |
||||||
327 | } |
||||||
328 | |||||||
329 | 1 | if (! $this->getSettings()->isSet('send_progress_in_http_headers')) { |
|||||
330 | 1 | $this->getSettings()->set('send_progress_in_http_headers', 1); |
|||||
331 | } |
||||||
332 | 1 | if (! $this->getSettings()->isSet('http_headers_progress_interval_ms')) { |
|||||
333 | 1 | $this->getSettings()->set('http_headers_progress_interval_ms', 100); |
|||||
334 | } |
||||||
335 | |||||||
336 | 1 | $this->transport->setProgressFunction($callback); |
|||||
337 | 1 | } |
|||||
338 | |||||||
339 | /** |
||||||
340 | * SHOW PROCESSLIST |
||||||
341 | * |
||||||
342 | * @return array |
||||||
343 | */ |
||||||
344 | public function showProcesslist() |
||||||
345 | { |
||||||
346 | return $this->select('SHOW PROCESSLIST')->rows(); |
||||||
347 | } |
||||||
348 | |||||||
349 | /** |
||||||
350 | * Get the number of simultaneous/Pending requests |
||||||
351 | * |
||||||
352 | * @return int |
||||||
353 | */ |
||||||
354 | 12 | public function getCountPendingQueue() |
|||||
355 | { |
||||||
356 | 12 | return $this->transport->getCountPendingQueue(); |
|||||
357 | } |
||||||
358 | |||||||
359 | /** |
||||||
360 | * @param mixed[][] $values |
||||||
361 | * @param string[] $columns |
||||||
362 | */ |
||||||
363 | 9 | public function insert(string $table, array $values, array $columns = []) : Statement |
|||||
364 | { |
||||||
365 | 9 | if (empty($values)) { |
|||||
366 | 1 | throw QueryException::cannotInsertEmptyValues(); |
|||||
367 | } |
||||||
368 | |||||||
369 | 8 | if (stripos($table, '`') === false && stripos($table, '.') === false) { |
|||||
370 | 5 | $table = '`' . $table . '`'; //quote table name for dot names |
|||||
371 | } |
||||||
372 | 8 | $sql = 'INSERT INTO ' . $table; |
|||||
373 | |||||||
374 | 8 | if (count($columns) !== 0) { |
|||||
375 | 7 | $sql .= ' (`' . implode('`,`', $columns) . '`) '; |
|||||
376 | } |
||||||
377 | |||||||
378 | 8 | $sql .= ' VALUES '; |
|||||
379 | |||||||
380 | 8 | foreach ($values as $row) { |
|||||
381 | 8 | $sql .= ' (' . FormatLine::Insert($row) . '), '; |
|||||
382 | } |
||||||
383 | 8 | $sql = trim($sql, ', '); |
|||||
384 | |||||||
385 | 8 | return $this->transport->write($sql); |
|||||
386 | } |
||||||
387 | |||||||
388 | /** |
||||||
389 | * * Prepares the values to insert from the associative array. |
||||||
390 | * * There may be one or more lines inserted, but then the keys inside the array list must match (including in the sequence) |
||||||
391 | * * |
||||||
392 | * * @param mixed[] $values - array column_name => value (if we insert one row) or array list column_name => value if we insert many lines |
||||||
393 | * * @return mixed[][] - list of arrays - 0 => fields, 1 => list of value arrays for insertion |
||||||
394 | * */ |
||||||
395 | 3 | public function prepareInsertAssocBulk(array $values) |
|||||
396 | { |
||||||
397 | 3 | if (isset($values[0]) && is_array($values[0])) { //случай, когда много строк вставляется |
|||||
398 | 2 | $preparedFields = array_keys($values[0]); |
|||||
399 | 2 | $preparedValues = []; |
|||||
400 | 2 | foreach ($values as $idx => $row) { |
|||||
401 | 2 | $_fields = array_keys($row); |
|||||
402 | 2 | if ($_fields !== $preparedFields) { |
|||||
403 | 1 | throw new QueryException( |
|||||
404 | 1 | sprintf( |
|||||
405 | 1 | 'Fields not match: %s and %s on element %s', |
|||||
406 | 1 | implode(',', $_fields), |
|||||
407 | 1 | implode(',', $preparedFields), |
|||||
408 | 1 | $idx |
|||||
409 | ) |
||||||
410 | ); |
||||||
411 | } |
||||||
412 | 2 | $preparedValues[] = array_values($row); |
|||||
413 | } |
||||||
414 | } else { |
||||||
415 | 1 | $preparedFields = array_keys($values); |
|||||
416 | 1 | $preparedValues = [array_values($values)]; |
|||||
417 | } |
||||||
418 | |||||||
419 | 2 | return [$preparedFields, $preparedValues]; |
|||||
420 | } |
||||||
421 | |||||||
422 | /** |
||||||
423 | * Inserts one or more rows from an associative array. |
||||||
424 | * If there is a discrepancy between the keys of the value arrays (or their order) - throws an exception. |
||||||
425 | * |
||||||
426 | * @param mixed[] $values - array column_name => value (if we insert one row) or array list column_name => value if we insert many lines |
||||||
427 | * @return Statement |
||||||
428 | */ |
||||||
429 | public function insertAssocBulk(string $tableName, array $values) |
||||||
430 | { |
||||||
431 | list($columns, $vals) = $this->prepareInsertAssocBulk($values); |
||||||
432 | |||||||
433 | return $this->insert($tableName, $vals, $columns); |
||||||
434 | } |
||||||
435 | |||||||
436 | /** |
||||||
437 | * insert TabSeparated files |
||||||
438 | * |
||||||
439 | * @param string|string[] $fileNames |
||||||
440 | * @param string[] $columns |
||||||
441 | * @return mixed |
||||||
442 | */ |
||||||
443 | 1 | public function insertBatchTSVFiles(string $tableName, $fileNames, array $columns = []) |
|||||
444 | { |
||||||
445 | 1 | return $this->insertBatchFiles($tableName, $fileNames, $columns, 'TabSeparated'); |
|||||
446 | } |
||||||
447 | |||||||
448 | /** |
||||||
449 | * insert Batch Files |
||||||
450 | * |
||||||
451 | * @param string|string[] $fileNames |
||||||
452 | * @param string[] $columns |
||||||
453 | * @param string $format ['TabSeparated','TabSeparatedWithNames','CSV','CSVWithNames'] |
||||||
454 | * @return Statement[] |
||||||
455 | * @throws Exception\TransportException |
||||||
456 | */ |
||||||
457 | 8 | public function insertBatchFiles(string $tableName, $fileNames, array $columns = [], string $format = 'CSV') |
|||||
458 | { |
||||||
459 | 8 | if (is_string($fileNames)) { |
|||||
460 | $fileNames = [$fileNames]; |
||||||
461 | } |
||||||
462 | 8 | if ($this->getCountPendingQueue() > 0) { |
|||||
463 | throw new QueryException('Queue must be empty, before insertBatch, need executeAsync'); |
||||||
464 | } |
||||||
465 | |||||||
466 | 8 | if (! in_array($format, self::SUPPORTED_FORMATS, true)) { |
|||||
467 | throw new QueryException('Format not support in insertBatchFiles'); |
||||||
468 | } |
||||||
469 | |||||||
470 | 8 | $result = []; |
|||||
471 | |||||||
472 | 8 | foreach ($fileNames as $fileName) { |
|||||
473 | 8 | if (! is_file($fileName) || ! is_readable($fileName)) { |
|||||
474 | throw new QueryException('Cant read file: ' . $fileName . ' ' . (is_file($fileName) ? '' : ' is not file')); |
||||||
475 | } |
||||||
476 | |||||||
477 | 8 | if (empty($columns)) { |
|||||
478 | $sql = 'INSERT INTO ' . $tableName . ' FORMAT ' . $format; |
||||||
479 | } else { |
||||||
480 | 8 | $sql = 'INSERT INTO ' . $tableName . ' ( ' . implode(',', $columns) . ' ) FORMAT ' . $format; |
|||||
481 | } |
||||||
482 | 8 | $result[$fileName] = $this->transport->writeAsyncCSV($sql, $fileName); |
|||||
483 | } |
||||||
484 | |||||||
485 | // exec |
||||||
486 | 8 | $this->executeAsync(); |
|||||
487 | |||||||
488 | // fetch resutl |
||||||
489 | 8 | foreach ($fileNames as $fileName) { |
|||||
490 | 8 | if (! $result[$fileName]->isError()) { |
|||||
491 | 6 | continue; |
|||||
492 | } |
||||||
493 | |||||||
494 | 2 | $result[$fileName]->error(); |
|||||
495 | } |
||||||
496 | |||||||
497 | 6 | return $result; |
|||||
498 | } |
||||||
499 | |||||||
500 | /** |
||||||
501 | * insert Batch Stream |
||||||
502 | * |
||||||
503 | * @param string[] $columns |
||||||
504 | * @param string $format ['TabSeparated','TabSeparatedWithNames','CSV','CSVWithNames'] |
||||||
505 | * @return Transport\CurlerRequest |
||||||
506 | */ |
||||||
507 | 2 | public function insertBatchStream(string $tableName, array $columns = [], string $format = 'CSV') |
|||||
508 | { |
||||||
509 | 2 | if ($this->getCountPendingQueue() > 0) { |
|||||
510 | throw new QueryException('Queue must be empty, before insertBatch, need executeAsync'); |
||||||
511 | } |
||||||
512 | |||||||
513 | 2 | if (! in_array($format, self::SUPPORTED_FORMATS, true)) { |
|||||
514 | throw new QueryException('Format not support in insertBatchFiles'); |
||||||
515 | } |
||||||
516 | |||||||
517 | 2 | if (empty($columns)) { |
|||||
518 | $sql = 'INSERT INTO ' . $tableName . ' FORMAT ' . $format; |
||||||
519 | } else { |
||||||
520 | 2 | $sql = 'INSERT INTO ' . $tableName . ' ( ' . implode(',', $columns) . ' ) FORMAT ' . $format; |
|||||
521 | } |
||||||
522 | |||||||
523 | 2 | return $this->transport->writeStreamData($sql); |
|||||
524 | } |
||||||
525 | |||||||
526 | /** |
||||||
527 | * stream Write |
||||||
528 | * |
||||||
529 | * @param string[] $bind |
||||||
530 | * @return Statement |
||||||
531 | * @throws Exception\TransportException |
||||||
532 | */ |
||||||
533 | 1 | public function streamWrite(Stream $stream, string $sql, array $bind = []) |
|||||
534 | { |
||||||
535 | 1 | if ($this->getCountPendingQueue() > 0) { |
|||||
536 | throw new QueryException('Queue must be empty, before streamWrite'); |
||||||
537 | } |
||||||
538 | |||||||
539 | 1 | return $this->transport->streamWrite($stream, $sql, $bind); |
|||||
540 | } |
||||||
541 | |||||||
542 | /** |
||||||
543 | * stream Read |
||||||
544 | * |
||||||
545 | * @param string[] $bind |
||||||
546 | * @return Statement |
||||||
547 | */ |
||||||
548 | 1 | public function streamRead(Stream $streamRead, string $sql, array $bind = []) |
|||||
549 | { |
||||||
550 | 1 | if ($this->getCountPendingQueue() > 0) { |
|||||
551 | throw new QueryException('Queue must be empty, before streamWrite'); |
||||||
552 | } |
||||||
553 | |||||||
554 | 1 | return $this->transport->streamRead($streamRead, $sql, $bind); |
|||||
555 | } |
||||||
556 | |||||||
557 | /** |
||||||
558 | * show databases |
||||||
559 | * |
||||||
560 | * @return array |
||||||
561 | */ |
||||||
562 | public function showDatabases() |
||||||
563 | { |
||||||
564 | return $this->select('show databases')->rows(); |
||||||
565 | } |
||||||
566 | |||||||
567 | /** |
||||||
568 | * Size of database |
||||||
569 | * |
||||||
570 | * @return mixed|null |
||||||
571 | */ |
||||||
572 | public function databaseSize() |
||||||
573 | { |
||||||
574 | $database = $this->transport->getDatabase(); |
||||||
0 ignored issues
–
show
The method
getDatabase() does not exist on ClickHouseDB\Transport\Http .
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces. This is most likely a typographical error or the method has been renamed. ![]() |
|||||||
575 | |||||||
576 | return $this->select( |
||||||
577 | ' |
||||||
578 | SELECT database,formatReadableSize(sum(bytes)) as size |
||||||
579 | FROM system.parts |
||||||
580 | WHERE active AND database=:database |
||||||
581 | GROUP BY database |
||||||
582 | ', |
||||||
583 | ['database' => $database] |
||||||
584 | )->fetchOne(); |
||||||
585 | } |
||||||
586 | |||||||
587 | /** |
||||||
588 | * SHOW TABLES |
||||||
589 | * |
||||||
590 | * @return mixed[] |
||||||
591 | */ |
||||||
592 | 1 | public function showTables() |
|||||
593 | { |
||||||
594 | 1 | return $this->select('SHOW TABLES')->rowsAsTree('name'); |
|||||
595 | } |
||||||
596 | |||||||
597 | /** |
||||||
598 | * statement = SHOW CREATE TABLE |
||||||
599 | * |
||||||
600 | * @return mixed |
||||||
601 | */ |
||||||
602 | public function showCreateTable(string $table) |
||||||
603 | { |
||||||
604 | return $this->select('SHOW CREATE TABLE ' . $table)->fetchOne('statement'); |
||||||
605 | } |
||||||
606 | |||||||
607 | /** |
||||||
608 | * Size of tables |
||||||
609 | * |
||||||
610 | * @return mixed |
||||||
611 | */ |
||||||
612 | 1 | public function tableSize(string $tableName) |
|||||
613 | { |
||||||
614 | 1 | $tables = $this->tablesSize(); |
|||||
615 | |||||||
616 | 1 | if (isset($tables[$tableName])) { |
|||||
617 | 1 | return $tables[$tableName]; |
|||||
618 | } |
||||||
619 | |||||||
620 | return null; |
||||||
621 | } |
||||||
622 | |||||||
623 | /** |
||||||
624 | * Tables sizes |
||||||
625 | * |
||||||
626 | * @param bool $flatList |
||||||
627 | * @return mixed[][] |
||||||
628 | */ |
||||||
629 | 1 | public function tablesSize($flatList = false) |
|||||
630 | { |
||||||
631 | 1 | $result = $this->select(' |
|||||
632 | SELECT name as table,database, |
||||||
633 | max(sizebytes) as sizebytes, |
||||||
634 | max(size) as size, |
||||||
635 | min(min_date) as min_date, |
||||||
636 | max(max_date) as max_date |
||||||
637 | FROM system.tables |
||||||
638 | ANY LEFT JOIN |
||||||
639 | ( |
||||||
640 | SELECT table,database, |
||||||
641 | formatReadableSize(sum(bytes)) as size, |
||||||
642 | sum(bytes) as sizebytes, |
||||||
643 | min(min_date) as min_date, |
||||||
644 | max(max_date) as max_date |
||||||
645 | FROM system.parts |
||||||
646 | WHERE active AND database=:database |
||||||
647 | GROUP BY table,database |
||||||
648 | ) USING ( table,database ) |
||||||
649 | WHERE database=:database |
||||||
650 | GROUP BY table,database |
||||||
651 | ', |
||||||
652 | 1 | ['database' => $this->database]); |
|||||
653 | |||||||
654 | 1 | if ($flatList) { |
|||||
655 | return $result->rows(); |
||||||
656 | } |
||||||
657 | |||||||
658 | 1 | return $result->rowsAsTree('table'); |
|||||
659 | } |
||||||
660 | |||||||
661 | /** |
||||||
662 | * isExists |
||||||
663 | * |
||||||
664 | * @return array |
||||||
665 | */ |
||||||
666 | public function tableExists(string $database, string $table) |
||||||
667 | { |
||||||
668 | return $this->select( |
||||||
669 | ' |
||||||
670 | SELECT * |
||||||
671 | FROM system.tables |
||||||
672 | WHERE name=\'' . $table . '\' AND database=\'' . $database . '\'' |
||||||
673 | )->rowsAsTree('name'); |
||||||
674 | } |
||||||
675 | |||||||
676 | /** |
||||||
677 | * List of partitions |
||||||
678 | * |
||||||
679 | * @return mixed[][] |
||||||
680 | */ |
||||||
681 | public function partitions(string $table, int $limit = null, bool $active = null) |
||||||
682 | { |
||||||
683 | $database = $this->database; |
||||||
684 | $whereActiveClause = $active === null ? '' : sprintf(' AND active = %s', (int) $active); |
||||||
685 | $limitClause = $limit !== null ? ' LIMIT ' . $limit : ''; |
||||||
686 | |||||||
687 | return $this->select(<<<CLICKHOUSE |
||||||
688 | SELECT * |
||||||
689 | FROM system.parts |
||||||
690 | WHERE like(table,'%$table%') AND database='$database'$whereActiveClause |
||||||
691 | ORDER BY max_date $limitClause |
||||||
692 | CLICKHOUSE |
||||||
693 | )->rowsAsTree('name'); |
||||||
694 | } |
||||||
695 | |||||||
696 | /** |
||||||
697 | * dropPartition |
||||||
698 | * @deprecated |
||||||
699 | * @return Statement |
||||||
700 | */ |
||||||
701 | public function dropPartition(string $dataBaseTableName, string $partition_id) |
||||||
702 | { |
||||||
703 | |||||||
704 | $partition_id = trim($partition_id, '\''); |
||||||
705 | $this->getSettings()->set('replication_alter_partitions_sync', 2); |
||||||
706 | $state = $this->write('ALTER TABLE {dataBaseTableName} DROP PARTITION :partion_id', |
||||||
707 | [ |
||||||
708 | 'dataBaseTableName' => $dataBaseTableName, |
||||||
709 | 'partion_id' => $partition_id, |
||||||
710 | ]); |
||||||
711 | |||||||
712 | return $state; |
||||||
713 | } |
||||||
714 | |||||||
715 | /** |
||||||
716 | * dropOldPartitions by day_ago |
||||||
717 | * @deprecated |
||||||
718 | * |
||||||
719 | * @return array |
||||||
720 | * @throws Exception\TransportException |
||||||
721 | * @throws \Exception |
||||||
722 | */ |
||||||
723 | public function dropOldPartitions(string $table_name, int $days_ago, int $count_partitons_per_one = 100) |
||||||
724 | { |
||||||
725 | $days_ago = strtotime(date('Y-m-d 00:00:00', strtotime('-' . $days_ago . ' day'))); |
||||||
726 | |||||||
727 | $drop = []; |
||||||
728 | $list_patitions = $this->partitions($table_name, $count_partitons_per_one); |
||||||
729 | |||||||
730 | foreach ($list_patitions as $partion_id => $partition) { |
||||||
731 | if (stripos($partition['engine'], 'mergetree') === false) { |
||||||
732 | continue; |
||||||
733 | } |
||||||
734 | |||||||
735 | // $min_date = strtotime($partition['min_date']); |
||||||
736 | $max_date = strtotime($partition['max_date']); |
||||||
737 | |||||||
738 | if ($max_date < $days_ago) { |
||||||
739 | $drop[] = $partition['partition']; |
||||||
740 | } |
||||||
741 | } |
||||||
742 | |||||||
743 | $result = []; |
||||||
744 | foreach ($drop as $partition_id) { |
||||||
745 | $result[$partition_id] = $this->dropPartition($table_name, $partition_id); |
||||||
746 | } |
||||||
747 | |||||||
748 | return $result; |
||||||
749 | } |
||||||
750 | |||||||
751 | /** |
||||||
752 | * Truncate ( drop all partitions ) |
||||||
753 | * @deprecated |
||||||
754 | * @return array |
||||||
755 | */ |
||||||
756 | public function truncateTable(string $tableName) |
||||||
757 | { |
||||||
758 | $partions = $this->partitions($tableName); |
||||||
759 | $out = []; |
||||||
760 | foreach ($partions as $part_key => $part) { |
||||||
761 | $part_id = $part['partition']; |
||||||
762 | $out[$part_id] = $this->dropPartition($tableName, $part_id); |
||||||
763 | } |
||||||
764 | |||||||
765 | return $out; |
||||||
766 | } |
||||||
767 | |||||||
768 | /** |
||||||
769 | * Returns the server's uptime in seconds. |
||||||
770 | * |
||||||
771 | * @return int |
||||||
772 | * @throws Exception\TransportException |
||||||
773 | */ |
||||||
774 | 1 | public function getServerUptime() |
|||||
775 | { |
||||||
776 | 1 | return $this->select('SELECT uptime() as uptime')->fetchOne('uptime'); |
|||||
777 | } |
||||||
778 | |||||||
779 | /** |
||||||
780 | * Returns string with the server version. |
||||||
781 | */ |
||||||
782 | 1 | public function getServerVersion() : string |
|||||
783 | { |
||||||
784 | 1 | return (string) $this->select('SELECT version() as version')->fetchOne('version'); |
|||||
785 | } |
||||||
786 | |||||||
787 | /** |
||||||
788 | * Read system.settings table |
||||||
789 | * |
||||||
790 | * @return mixed[][] |
||||||
791 | */ |
||||||
792 | 1 | public function getServerSystemSettings(string $like = '') |
|||||
793 | { |
||||||
794 | 1 | $l = []; |
|||||
795 | 1 | $list = $this->select('SELECT * FROM system.settings' . ($like ? ' WHERE name LIKE :like' : ''), |
|||||
796 | 1 | ['like' => '%' . $like . '%'])->rows(); |
|||||
797 | 1 | foreach ($list as $row) { |
|||||
798 | 1 | if (isset($row['name'])) { |
|||||
799 | 1 | $n = $row['name']; |
|||||
800 | 1 | unset($row['name']); |
|||||
801 | 1 | $l[$n] = $row; |
|||||
802 | } |
||||||
803 | } |
||||||
804 | |||||||
805 | 1 | return $l; |
|||||
806 | } |
||||||
807 | } |
||||||
808 |