| 1 | <?php |
||||||
| 2 | |||||||
| 3 | declare(strict_types=1); |
||||||
| 4 | |||||||
| 5 | namespace ClickHouseDB; |
||||||
| 6 | |||||||
| 7 | use ClickHouseDB\Exception\QueryException; |
||||||
| 8 | use ClickHouseDB\Query\Degeneration; |
||||||
| 9 | use ClickHouseDB\Query\Degeneration\Bindings; |
||||||
| 10 | use ClickHouseDB\Query\Degeneration\Conditions; |
||||||
| 11 | use ClickHouseDB\Query\WhereInFile; |
||||||
| 12 | use ClickHouseDB\Query\WriteToFile; |
||||||
| 13 | use ClickHouseDB\Quote\FormatLine; |
||||||
| 14 | use ClickHouseDB\Transport\Http; |
||||||
| 15 | use ClickHouseDB\Transport\Stream; |
||||||
| 16 | use function array_flip; |
||||||
| 17 | use function array_keys; |
||||||
| 18 | use function array_rand; |
||||||
| 19 | use function array_values; |
||||||
| 20 | use function count; |
||||||
| 21 | use function date; |
||||||
| 22 | use function implode; |
||||||
| 23 | use function in_array; |
||||||
| 24 | use function is_array; |
||||||
| 25 | use function is_callable; |
||||||
| 26 | use function is_file; |
||||||
| 27 | use function is_readable; |
||||||
| 28 | use function is_string; |
||||||
| 29 | use function sprintf; |
||||||
| 30 | use function stripos; |
||||||
| 31 | use function strtotime; |
||||||
| 32 | use function trim; |
||||||
| 33 | |||||||
| 34 | class Client |
||||||
| 35 | { |
||||||
| 36 | private const DEFAULT_USERNAME = 'default'; |
||||||
| 37 | private const DEFAULT_PASSWORD = ''; |
||||||
| 38 | private const DEFAULT_PORT = 8123; |
||||||
| 39 | private const DEFAULT_HOST = '127.0.0.1'; |
||||||
| 40 | private const DEFAULT_DATABASE = 'default'; |
||||||
| 41 | private const SUPPORTED_FORMATS = ['TabSeparated', 'TabSeparatedWithNames', 'CSV', 'CSVWithNames', 'JSONEachRow']; |
||||||
| 42 | |||||||
| 43 | /** @var Http */ |
||||||
| 44 | private $transport; |
||||||
| 45 | |||||||
| 46 | /** @var string */ |
||||||
| 47 | private $username; |
||||||
| 48 | |||||||
| 49 | /** @var string */ |
||||||
| 50 | private $password; |
||||||
| 51 | |||||||
| 52 | /** @var string */ |
||||||
| 53 | private $host; |
||||||
| 54 | |||||||
| 55 | /** @var string */ |
||||||
| 56 | private $port; |
||||||
| 57 | |||||||
| 58 | /** @var string */ |
||||||
| 59 | private $database; |
||||||
| 60 | |||||||
| 61 | /** |
||||||
| 62 | * @param mixed[] $connectParams |
||||||
| 63 | * @param mixed[] $settings |
||||||
| 64 | */ |
||||||
| 65 | 63 | public function __construct(array $connectParams, array $settings = []) |
|||||
| 66 | { |
||||||
| 67 | 63 | $this->username = $connectParams['username'] ?? self::DEFAULT_USERNAME; |
|||||
| 68 | 63 | $this->password = $connectParams['password'] ?? self::DEFAULT_PASSWORD; |
|||||
| 69 | 63 | $this->port = $connectParams['port'] ?? self::DEFAULT_PORT; |
|||||
| 70 | 63 | $this->host = $connectParams['host'] ?? self::DEFAULT_HOST; |
|||||
| 71 | 63 | $this->database = $connectParams['database'] ?? self::DEFAULT_DATABASE; |
|||||
| 72 | |||||||
| 73 | 63 | $this->transport = new Http( |
|||||
| 74 | 63 | $this->host, |
|||||
| 75 | 63 | $this->port, |
|||||
| 76 | 63 | $this->username, |
|||||
| 77 | 63 | $this->password, |
|||||
| 78 | 63 | $this->database |
|||||
| 79 | ); |
||||||
| 80 | |||||||
| 81 | 63 | $this->transport->addQueryDegeneration(new Bindings()); |
|||||
| 82 | |||||||
| 83 | // apply settings to transport class |
||||||
| 84 | 63 | if (! empty($settings)) { |
|||||
| 85 | 1 | $this->getSettings()->apply($settings); |
|||||
| 86 | } |
||||||
| 87 | |||||||
| 88 | 63 | if (isset($connectParams['https'])) { |
|||||
| 89 | $this->setHttps($connectParams['https']); |
||||||
| 90 | } |
||||||
| 91 | |||||||
| 92 | 63 | $this->setHttpCompression(true); |
|||||
| 93 | 63 | } |
|||||
| 94 | |||||||
| 95 | /** |
||||||
| 96 | * Clear Degeneration processing request [template ] |
||||||
| 97 | */ |
||||||
| 98 | 1 | public function cleanQueryDegeneration() : void |
|||||
| 99 | { |
||||||
| 100 | 1 | $this->transport->cleanQueryDegeneration(); |
|||||
| 101 | 1 | } |
|||||
| 102 | |||||||
| 103 | /** |
||||||
| 104 | * Degeneration processing |
||||||
| 105 | */ |
||||||
| 106 | public function addQueryDegeneration(Degeneration $degeneration) : void |
||||||
| 107 | { |
||||||
| 108 | $this->transport->addQueryDegeneration($degeneration); |
||||||
| 109 | } |
||||||
| 110 | |||||||
| 111 | /** |
||||||
| 112 | * Add Conditions Degeneration to query processing |
||||||
| 113 | */ |
||||||
| 114 | 1 | public function enableQueryConditions() : void |
|||||
| 115 | { |
||||||
| 116 | 1 | $this->transport->addQueryDegeneration(new Conditions()); |
|||||
| 117 | 1 | } |
|||||
| 118 | |||||||
| 119 | 2 | public function setTimeout(float $seconds) : void |
|||||
| 120 | { |
||||||
| 121 | 2 | $this->transport->setTimeout($seconds); |
|||||
| 122 | 2 | } |
|||||
| 123 | |||||||
| 124 | 2 | public function setConnectTimeout(float $seconds) : void |
|||||
| 125 | { |
||||||
| 126 | 2 | $this->transport->setConnectTimeout($seconds); |
|||||
| 127 | 2 | } |
|||||
| 128 | |||||||
| 129 | /** |
||||||
| 130 | * @return string |
||||||
| 131 | */ |
||||||
| 132 | public function getHost() |
||||||
| 133 | { |
||||||
| 134 | return $this->host; |
||||||
| 135 | } |
||||||
| 136 | |||||||
| 137 | /** |
||||||
| 138 | * Set connection host |
||||||
| 139 | * |
||||||
| 140 | * @param string $host |
||||||
| 141 | */ |
||||||
| 142 | public function setHost($host) |
||||||
| 143 | { |
||||||
| 144 | $this->host = $host; |
||||||
| 145 | $this->transport->setHost($host); |
||||||
| 146 | } |
||||||
| 147 | |||||||
| 148 | public function getPassword() : string |
||||||
| 149 | { |
||||||
| 150 | return $this->password; |
||||||
| 151 | } |
||||||
| 152 | |||||||
| 153 | public function getPort() : int |
||||||
| 154 | { |
||||||
| 155 | return $this->port; |
||||||
|
0 ignored issues
–
show
Bug
Best Practice
introduced
by
Loading history...
|
|||||||
| 156 | } |
||||||
| 157 | |||||||
| 158 | public function getUsername() : string |
||||||
| 159 | { |
||||||
| 160 | return $this->username; |
||||||
| 161 | } |
||||||
| 162 | |||||||
| 163 | 63 | public function setDatabase(string $database) : self |
|||||
| 164 | { |
||||||
| 165 | 63 | $this->database = $database; |
|||||
| 166 | 63 | $this->transport->setDatabase($database); |
|||||
| 167 | |||||||
| 168 | 63 | return $this; |
|||||
| 169 | } |
||||||
| 170 | |||||||
| 171 | 2 | public function getTransport() : Http |
|||||
| 172 | { |
||||||
| 173 | 2 | return $this->transport; |
|||||
| 174 | } |
||||||
| 175 | |||||||
| 176 | /** |
||||||
| 177 | * @return mixed |
||||||
| 178 | */ |
||||||
| 179 | public function verbose() |
||||||
| 180 | { |
||||||
| 181 | return $this->transport->setVerbose(); |
||||||
| 182 | } |
||||||
| 183 | |||||||
| 184 | 63 | public function getSettings() : Settings |
|||||
| 185 | { |
||||||
| 186 | 63 | return $this->transport->getSettings(); |
|||||
| 187 | } |
||||||
| 188 | |||||||
| 189 | /** |
||||||
| 190 | * @return static |
||||||
| 191 | */ |
||||||
| 192 | 2 | public function useSession(bool $useSessionId = false) |
|||||
| 193 | { |
||||||
| 194 | 2 | if (! $this->getSettings()->getSessionId()) { |
|||||
| 195 | 2 | if (! $useSessionId) { |
|||||
| 196 | 2 | $this->getSettings()->makeSessionId(); |
|||||
| 197 | } else { |
||||||
| 198 | $this->getSettings()->session_id($useSessionId); |
||||||
| 199 | } |
||||||
| 200 | } |
||||||
| 201 | |||||||
| 202 | 2 | return $this; |
|||||
| 203 | } |
||||||
| 204 | |||||||
| 205 | /** |
||||||
| 206 | * @return mixed |
||||||
| 207 | */ |
||||||
| 208 | 2 | public function getSession() |
|||||
| 209 | { |
||||||
| 210 | 2 | return $this->getSettings()->getSessionId(); |
|||||
| 211 | } |
||||||
| 212 | |||||||
| 213 | /** |
||||||
| 214 | * Query CREATE/DROP |
||||||
| 215 | * |
||||||
| 216 | * @param mixed[] $bindings |
||||||
| 217 | * @return Statement |
||||||
| 218 | */ |
||||||
| 219 | 63 | public function write(string $sql, array $bindings = [], bool $exception = true) |
|||||
| 220 | { |
||||||
| 221 | 63 | return $this->transport->write($sql, $bindings, $exception); |
|||||
| 222 | } |
||||||
| 223 | |||||||
| 224 | /** |
||||||
| 225 | * Write to system.query_log |
||||||
| 226 | * |
||||||
| 227 | * @return static |
||||||
| 228 | */ |
||||||
| 229 | public function enableLogQueries(bool $flag = true) |
||||||
| 230 | { |
||||||
| 231 | $this->getSettings()->set('log_queries', (int) $flag); |
||||||
| 232 | |||||||
| 233 | return $this; |
||||||
| 234 | } |
||||||
| 235 | |||||||
| 236 | /** |
||||||
| 237 | * Compress the result if the HTTP client said that it understands data compressed with gzip or deflate |
||||||
| 238 | */ |
||||||
| 239 | 63 | public function setHttpCompression(bool $enable) : self |
|||||
| 240 | { |
||||||
| 241 | 63 | $this->getSettings()->setHttpCompression($enable); |
|||||
| 242 | |||||||
| 243 | 63 | return $this; |
|||||
| 244 | } |
||||||
| 245 | |||||||
| 246 | /** |
||||||
| 247 | * Enable / Disable HTTPS |
||||||
| 248 | * |
||||||
| 249 | * @return static |
||||||
| 250 | */ |
||||||
| 251 | 1 | public function setHttps(bool $flag) |
|||||
| 252 | { |
||||||
| 253 | 1 | $this->transport->setHttps($flag); |
|||||
| 254 | |||||||
| 255 | 1 | return $this; |
|||||
| 256 | } |
||||||
| 257 | |||||||
| 258 | /** |
||||||
| 259 | * Read extremes of the result columns. They can be output in JSON-formats. |
||||||
| 260 | * |
||||||
| 261 | * @return static |
||||||
| 262 | */ |
||||||
| 263 | 2 | public function enableExtremes(bool $flag = true) |
|||||
| 264 | { |
||||||
| 265 | 2 | $this->getSettings()->set('extremes', (int) $flag); |
|||||
| 266 | |||||||
| 267 | 2 | return $this; |
|||||
| 268 | } |
||||||
| 269 | |||||||
| 270 | /** |
||||||
| 271 | * Ping server |
||||||
| 272 | * |
||||||
| 273 | * @return bool |
||||||
| 274 | */ |
||||||
| 275 | 39 | public function ping() |
|||||
| 276 | { |
||||||
| 277 | 39 | return $this->transport->ping(); |
|||||
| 278 | } |
||||||
| 279 | |||||||
| 280 | /** |
||||||
| 281 | * @param mixed[] $bindings |
||||||
| 282 | * @return Statement |
||||||
| 283 | */ |
||||||
| 284 | 29 | public function select( |
|||||
| 285 | string $sql, |
||||||
| 286 | array $bindings = [], |
||||||
| 287 | WhereInFile $whereInFile = null, |
||||||
| 288 | WriteToFile $writeToFile = null |
||||||
| 289 | ) { |
||||||
| 290 | 29 | return $this->transport->select($sql, $bindings, $whereInFile, $writeToFile); |
|||||
| 291 | } |
||||||
| 292 | |||||||
| 293 | /** |
||||||
| 294 | * prepare select |
||||||
| 295 | * |
||||||
| 296 | * @param mixed[] $bindings |
||||||
| 297 | * @return Statement |
||||||
| 298 | */ |
||||||
| 299 | 5 | public function selectAsync( |
|||||
| 300 | string $sql, |
||||||
| 301 | array $bindings = [], |
||||||
| 302 | WhereInFile $whereInFile = null, |
||||||
| 303 | WriteToFile $writeToFile = null |
||||||
| 304 | ) { |
||||||
| 305 | 5 | return $this->transport->selectAsync($sql, $bindings, $whereInFile, $writeToFile); |
|||||
| 306 | } |
||||||
| 307 | |||||||
| 308 | /** |
||||||
| 309 | * @return bool |
||||||
| 310 | */ |
||||||
| 311 | 10 | public function executeAsync() |
|||||
| 312 | { |
||||||
| 313 | 10 | return $this->transport->executeAsync(); |
|||||
| 314 | } |
||||||
| 315 | |||||||
| 316 | /** |
||||||
| 317 | * set progressFunction |
||||||
| 318 | */ |
||||||
| 319 | 1 | public function progressFunction(callable $callback) |
|||||
| 320 | { |
||||||
| 321 | 1 | if (! is_callable($callback)) { |
|||||
| 322 | throw new \InvalidArgumentException('Not is_callable progressFunction'); |
||||||
| 323 | } |
||||||
| 324 | |||||||
| 325 | 1 | if (! $this->getSettings()->isSet('send_progress_in_http_headers')) { |
|||||
| 326 | 1 | $this->getSettings()->set('send_progress_in_http_headers', 1); |
|||||
| 327 | } |
||||||
| 328 | 1 | if (! $this->getSettings()->isSet('http_headers_progress_interval_ms')) { |
|||||
| 329 | 1 | $this->getSettings()->set('http_headers_progress_interval_ms', 100); |
|||||
| 330 | } |
||||||
| 331 | |||||||
| 332 | 1 | $this->transport->setProgressFunction($callback); |
|||||
| 333 | 1 | } |
|||||
| 334 | |||||||
| 335 | /** |
||||||
| 336 | * SHOW PROCESSLIST |
||||||
| 337 | * |
||||||
| 338 | * @return array |
||||||
| 339 | */ |
||||||
| 340 | public function showProcesslist() |
||||||
| 341 | { |
||||||
| 342 | return $this->select('SHOW PROCESSLIST')->rows(); |
||||||
| 343 | } |
||||||
| 344 | |||||||
| 345 | /** |
||||||
| 346 | * Get the number of simultaneous/Pending requests |
||||||
| 347 | * |
||||||
| 348 | * @return int |
||||||
| 349 | */ |
||||||
| 350 | 12 | public function getCountPendingQueue() |
|||||
| 351 | { |
||||||
| 352 | 12 | return $this->transport->getCountPendingQueue(); |
|||||
| 353 | } |
||||||
| 354 | |||||||
| 355 | /** |
||||||
| 356 | * @param mixed[][] $values |
||||||
| 357 | * @param string[] $columns |
||||||
| 358 | * @return Statement |
||||||
| 359 | * @throws Exception\TransportException |
||||||
| 360 | */ |
||||||
| 361 | 9 | public function insert(string $table, array $values, array $columns = []) : Statement |
|||||
| 362 | { |
||||||
| 363 | 9 | if (empty($values)) { |
|||||
| 364 | 1 | throw QueryException::cannotInsertEmptyValues(); |
|||||
| 365 | } |
||||||
| 366 | |||||||
| 367 | 8 | if (stripos($table, '`') === false && stripos($table, '.') === false) { |
|||||
| 368 | 5 | $table = '`' . $table . '`'; //quote table name for dot names |
|||||
| 369 | } |
||||||
| 370 | 8 | $sql = 'INSERT INTO ' . $table; |
|||||
| 371 | |||||||
| 372 | 8 | if (count($columns) !== 0) { |
|||||
| 373 | 7 | $sql .= ' (`' . implode('`,`', $columns) . '`) '; |
|||||
| 374 | } |
||||||
| 375 | |||||||
| 376 | 8 | $sql .= ' VALUES '; |
|||||
| 377 | |||||||
| 378 | 8 | foreach ($values as $row) { |
|||||
| 379 | 8 | $sql .= ' (' . FormatLine::Insert($row) . '), '; |
|||||
| 380 | } |
||||||
| 381 | 8 | $sql = trim($sql, ', '); |
|||||
| 382 | |||||||
| 383 | 8 | return $this->transport->write($sql); |
|||||
| 384 | } |
||||||
| 385 | |||||||
| 386 | /** |
||||||
| 387 | * * Prepares the values to insert from the associative array. |
||||||
| 388 | * * There may be one or more lines inserted, but then the keys inside the array list must match (including in the sequence) |
||||||
| 389 | * * |
||||||
| 390 | * * @param mixed[] $values - array column_name => value (if we insert one row) or array list column_name => value if we insert many lines |
||||||
| 391 | * * @return mixed[][] - list of arrays - 0 => fields, 1 => list of value arrays for insertion |
||||||
| 392 | * */ |
||||||
| 393 | 3 | public function prepareInsertAssocBulk(array $values) |
|||||
| 394 | { |
||||||
| 395 | 3 | if (isset($values[0]) && is_array($values[0])) { //случай, когда много строк вставляется |
|||||
| 396 | 2 | $preparedFields = array_keys($values[0]); |
|||||
| 397 | 2 | $preparedValues = []; |
|||||
| 398 | 2 | foreach ($values as $idx => $row) { |
|||||
| 399 | 2 | $_fields = array_keys($row); |
|||||
| 400 | 2 | if ($_fields !== $preparedFields) { |
|||||
| 401 | 1 | throw new QueryException( |
|||||
| 402 | 1 | sprintf( |
|||||
| 403 | 1 | 'Fields not match: %s and %s on element %s', |
|||||
| 404 | 1 | implode(',', $_fields), |
|||||
| 405 | 1 | implode(',', $preparedFields), |
|||||
| 406 | 1 | $idx |
|||||
| 407 | ) |
||||||
| 408 | ); |
||||||
| 409 | } |
||||||
| 410 | 2 | $preparedValues[] = array_values($row); |
|||||
| 411 | } |
||||||
| 412 | } else { |
||||||
| 413 | 1 | $preparedFields = array_keys($values); |
|||||
| 414 | 1 | $preparedValues = [array_values($values)]; |
|||||
| 415 | } |
||||||
| 416 | |||||||
| 417 | 2 | return [$preparedFields, $preparedValues]; |
|||||
| 418 | } |
||||||
| 419 | |||||||
| 420 | /** |
||||||
| 421 | * Inserts one or more rows from an associative array. |
||||||
| 422 | * If there is a discrepancy between the keys of the value arrays (or their order) - throws an exception. |
||||||
| 423 | * |
||||||
| 424 | * @param mixed[] $values - array column_name => value (if we insert one row) or array list column_name => value if we insert many lines |
||||||
| 425 | * @return Statement |
||||||
| 426 | */ |
||||||
| 427 | public function insertAssocBulk(string $tableName, array $values) |
||||||
| 428 | { |
||||||
| 429 | list($columns, $vals) = $this->prepareInsertAssocBulk($values); |
||||||
| 430 | |||||||
| 431 | return $this->insert($tableName, $vals, $columns); |
||||||
| 432 | } |
||||||
| 433 | |||||||
| 434 | /** |
||||||
| 435 | * insert TabSeparated files |
||||||
| 436 | * |
||||||
| 437 | * @param string|string[] $fileNames |
||||||
| 438 | * @param string[] $columns |
||||||
| 439 | * @return mixed |
||||||
| 440 | */ |
||||||
| 441 | 1 | public function insertBatchTSVFiles(string $tableName, $fileNames, array $columns = []) |
|||||
| 442 | { |
||||||
| 443 | 1 | return $this->insertBatchFiles($tableName, $fileNames, $columns, 'TabSeparated'); |
|||||
| 444 | } |
||||||
| 445 | |||||||
| 446 | /** |
||||||
| 447 | * insert Batch Files |
||||||
| 448 | * |
||||||
| 449 | * @param string|string[] $fileNames |
||||||
| 450 | * @param string[] $columns |
||||||
| 451 | * @param string $format ['TabSeparated','TabSeparatedWithNames','CSV','CSVWithNames'] |
||||||
| 452 | * @return Statement[] |
||||||
| 453 | * @throws Exception\TransportException |
||||||
| 454 | */ |
||||||
| 455 | 8 | public function insertBatchFiles(string $tableName, $fileNames, array $columns = [], string $format = 'CSV') |
|||||
| 456 | { |
||||||
| 457 | 8 | if (is_string($fileNames)) { |
|||||
| 458 | $fileNames = [$fileNames]; |
||||||
| 459 | } |
||||||
| 460 | 8 | if ($this->getCountPendingQueue() > 0) { |
|||||
| 461 | throw new QueryException('Queue must be empty, before insertBatch, need executeAsync'); |
||||||
| 462 | } |
||||||
| 463 | |||||||
| 464 | 8 | if (! in_array($format, self::SUPPORTED_FORMATS, true)) { |
|||||
| 465 | throw new QueryException('Format not support in insertBatchFiles'); |
||||||
| 466 | } |
||||||
| 467 | |||||||
| 468 | 8 | $result = []; |
|||||
| 469 | |||||||
| 470 | 8 | foreach ($fileNames as $fileName) { |
|||||
| 471 | 8 | if (! is_file($fileName) || ! is_readable($fileName)) { |
|||||
| 472 | throw new QueryException('Cant read file: ' . $fileName . ' ' . (is_file($fileName) ? '' : ' is not file')); |
||||||
| 473 | } |
||||||
| 474 | |||||||
| 475 | 8 | if (empty($columns)) { |
|||||
| 476 | $sql = 'INSERT INTO ' . $tableName . ' FORMAT ' . $format; |
||||||
| 477 | } else { |
||||||
| 478 | 8 | $sql = 'INSERT INTO ' . $tableName . ' ( ' . implode(',', $columns) . ' ) FORMAT ' . $format; |
|||||
| 479 | } |
||||||
| 480 | 8 | $result[$fileName] = $this->transport->writeAsyncCSV($sql, $fileName); |
|||||
| 481 | } |
||||||
| 482 | |||||||
| 483 | // exec |
||||||
| 484 | 8 | $this->executeAsync(); |
|||||
| 485 | |||||||
| 486 | // fetch resutl |
||||||
| 487 | 8 | foreach ($fileNames as $fileName) { |
|||||
| 488 | 8 | if (! $result[$fileName]->isError()) { |
|||||
| 489 | 6 | continue; |
|||||
| 490 | } |
||||||
| 491 | |||||||
| 492 | 2 | $result[$fileName]->error(); |
|||||
| 493 | } |
||||||
| 494 | |||||||
| 495 | 6 | return $result; |
|||||
| 496 | } |
||||||
| 497 | |||||||
| 498 | /** |
||||||
| 499 | * insert Batch Stream |
||||||
| 500 | * |
||||||
| 501 | * @param string[] $columns |
||||||
| 502 | * @param string $format ['TabSeparated','TabSeparatedWithNames','CSV','CSVWithNames'] |
||||||
| 503 | * @return Transport\CurlerRequest |
||||||
| 504 | */ |
||||||
| 505 | 2 | public function insertBatchStream(string $tableName, array $columns = [], string $format = 'CSV') |
|||||
| 506 | { |
||||||
| 507 | 2 | if ($this->getCountPendingQueue() > 0) { |
|||||
| 508 | throw new QueryException('Queue must be empty, before insertBatch, need executeAsync'); |
||||||
| 509 | } |
||||||
| 510 | |||||||
| 511 | 2 | if (! in_array($format, self::SUPPORTED_FORMATS, true)) { |
|||||
| 512 | throw new QueryException('Format not support in insertBatchFiles'); |
||||||
| 513 | } |
||||||
| 514 | |||||||
| 515 | 2 | if (empty($columns)) { |
|||||
| 516 | $sql = 'INSERT INTO ' . $tableName . ' FORMAT ' . $format; |
||||||
| 517 | } else { |
||||||
| 518 | 2 | $sql = 'INSERT INTO ' . $tableName . ' ( ' . implode(',', $columns) . ' ) FORMAT ' . $format; |
|||||
| 519 | } |
||||||
| 520 | |||||||
| 521 | 2 | return $this->transport->writeStreamData($sql); |
|||||
| 522 | } |
||||||
| 523 | |||||||
| 524 | /** |
||||||
| 525 | * stream Write |
||||||
| 526 | * |
||||||
| 527 | * @param string[] $bind |
||||||
| 528 | * @return Statement |
||||||
| 529 | * @throws Exception\TransportException |
||||||
| 530 | */ |
||||||
| 531 | 1 | public function streamWrite(Stream $stream, string $sql, array $bind = []) |
|||||
| 532 | { |
||||||
| 533 | 1 | if ($this->getCountPendingQueue() > 0) { |
|||||
| 534 | throw new QueryException('Queue must be empty, before streamWrite'); |
||||||
| 535 | } |
||||||
| 536 | |||||||
| 537 | 1 | return $this->transport->streamWrite($stream, $sql, $bind); |
|||||
| 538 | } |
||||||
| 539 | |||||||
| 540 | /** |
||||||
| 541 | * stream Read |
||||||
| 542 | * |
||||||
| 543 | * @param string[] $bind |
||||||
| 544 | * @return Statement |
||||||
| 545 | */ |
||||||
| 546 | 1 | public function streamRead(Stream $streamRead, string $sql, array $bind = []) |
|||||
| 547 | { |
||||||
| 548 | 1 | if ($this->getCountPendingQueue() > 0) { |
|||||
| 549 | throw new QueryException('Queue must be empty, before streamWrite'); |
||||||
| 550 | } |
||||||
| 551 | |||||||
| 552 | 1 | return $this->transport->streamRead($streamRead, $sql, $bind); |
|||||
| 553 | } |
||||||
| 554 | |||||||
| 555 | /** |
||||||
| 556 | * show databases |
||||||
| 557 | * |
||||||
| 558 | * @return array |
||||||
| 559 | */ |
||||||
| 560 | public function showDatabases() |
||||||
| 561 | { |
||||||
| 562 | return $this->select('show databases')->rows(); |
||||||
| 563 | } |
||||||
| 564 | |||||||
| 565 | /** |
||||||
| 566 | * Size of database |
||||||
| 567 | * |
||||||
| 568 | * @return mixed|null |
||||||
| 569 | */ |
||||||
| 570 | public function databaseSize() |
||||||
| 571 | { |
||||||
| 572 | $database = $this->transport->getDatabase(); |
||||||
|
0 ignored issues
–
show
The method
getDatabase() does not exist on ClickHouseDB\Transport\Http.
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces. This is most likely a typographical error or the method has been renamed. Loading history...
|
|||||||
| 573 | |||||||
| 574 | return $this->select( |
||||||
| 575 | ' |
||||||
| 576 | SELECT database,formatReadableSize(sum(bytes)) as size |
||||||
| 577 | FROM system.parts |
||||||
| 578 | WHERE active AND database=:database |
||||||
| 579 | GROUP BY database |
||||||
| 580 | ', |
||||||
| 581 | ['database' => $database] |
||||||
| 582 | )->fetchOne(); |
||||||
| 583 | } |
||||||
| 584 | |||||||
| 585 | /** |
||||||
| 586 | * SHOW TABLES |
||||||
| 587 | * |
||||||
| 588 | * @return mixed[] |
||||||
| 589 | */ |
||||||
| 590 | 1 | public function showTables() |
|||||
| 591 | { |
||||||
| 592 | 1 | return $this->select('SHOW TABLES')->rowsAsTree('name'); |
|||||
| 593 | } |
||||||
| 594 | |||||||
| 595 | /** |
||||||
| 596 | * statement = SHOW CREATE TABLE |
||||||
| 597 | * |
||||||
| 598 | * @return mixed |
||||||
| 599 | */ |
||||||
| 600 | public function showCreateTable(string $table) |
||||||
| 601 | { |
||||||
| 602 | return $this->select('SHOW CREATE TABLE ' . $table)->fetchOne('statement'); |
||||||
| 603 | } |
||||||
| 604 | |||||||
| 605 | /** |
||||||
| 606 | * Size of tables |
||||||
| 607 | * |
||||||
| 608 | * @return mixed |
||||||
| 609 | */ |
||||||
| 610 | 1 | public function tableSize(string $tableName) |
|||||
| 611 | { |
||||||
| 612 | 1 | $tables = $this->tablesSize(); |
|||||
| 613 | |||||||
| 614 | 1 | if (isset($tables[$tableName])) { |
|||||
| 615 | 1 | return $tables[$tableName]; |
|||||
| 616 | } |
||||||
| 617 | |||||||
| 618 | return null; |
||||||
| 619 | } |
||||||
| 620 | |||||||
| 621 | /** |
||||||
| 622 | * Tables sizes |
||||||
| 623 | * |
||||||
| 624 | * @param bool $flatList |
||||||
| 625 | * @return mixed[][] |
||||||
| 626 | */ |
||||||
| 627 | 1 | public function tablesSize($flatList = false) |
|||||
| 628 | { |
||||||
| 629 | 1 | $result = $this->select(' |
|||||
| 630 | SELECT name as table,database, |
||||||
| 631 | max(sizebytes) as sizebytes, |
||||||
| 632 | max(size) as size, |
||||||
| 633 | min(min_date) as min_date, |
||||||
| 634 | max(max_date) as max_date |
||||||
| 635 | FROM system.tables |
||||||
| 636 | ANY LEFT JOIN |
||||||
| 637 | ( |
||||||
| 638 | SELECT table,database, |
||||||
| 639 | formatReadableSize(sum(bytes)) as size, |
||||||
| 640 | sum(bytes) as sizebytes, |
||||||
| 641 | min(min_date) as min_date, |
||||||
| 642 | max(max_date) as max_date |
||||||
| 643 | FROM system.parts |
||||||
| 644 | WHERE active AND database=:database |
||||||
| 645 | GROUP BY table,database |
||||||
| 646 | ) USING ( table,database ) |
||||||
| 647 | WHERE database=:database |
||||||
| 648 | GROUP BY table,database |
||||||
| 649 | ', |
||||||
| 650 | 1 | ['database' => $this->database]); |
|||||
| 651 | |||||||
| 652 | 1 | if ($flatList) { |
|||||
| 653 | return $result->rows(); |
||||||
| 654 | } |
||||||
| 655 | |||||||
| 656 | 1 | return $result->rowsAsTree('table'); |
|||||
| 657 | } |
||||||
| 658 | |||||||
| 659 | /** |
||||||
| 660 | * isExists |
||||||
| 661 | * |
||||||
| 662 | * @return array |
||||||
| 663 | */ |
||||||
| 664 | public function tableExists(string $database, string $table) |
||||||
| 665 | { |
||||||
| 666 | return $this->select( |
||||||
| 667 | ' |
||||||
| 668 | SELECT * |
||||||
| 669 | FROM system.tables |
||||||
| 670 | WHERE name=\'' . $table . '\' AND database=\'' . $database . '\'' |
||||||
| 671 | )->rowsAsTree('name'); |
||||||
| 672 | } |
||||||
| 673 | |||||||
| 674 | /** |
||||||
| 675 | * List of partitions |
||||||
| 676 | * |
||||||
| 677 | * @return mixed[][] |
||||||
| 678 | */ |
||||||
| 679 | public function partitions(string $table, int $limit = null, bool $active = null) |
||||||
| 680 | { |
||||||
| 681 | $database = $this->database; |
||||||
| 682 | $whereActiveClause = $active === null ? '' : sprintf(' AND active = %s', (int) $active); |
||||||
| 683 | $limitClause = $limit !== null ? ' LIMIT ' . $limit : ''; |
||||||
| 684 | |||||||
| 685 | return $this->select(<<<CLICKHOUSE |
||||||
| 686 | SELECT * |
||||||
| 687 | FROM system.parts |
||||||
| 688 | WHERE like(table,'%$table%') AND database='$database'$whereActiveClause |
||||||
| 689 | ORDER BY max_date $limitClause |
||||||
| 690 | CLICKHOUSE |
||||||
| 691 | )->rowsAsTree('name'); |
||||||
| 692 | } |
||||||
| 693 | |||||||
| 694 | /** |
||||||
| 695 | * dropPartition |
||||||
| 696 | * @deprecated |
||||||
| 697 | * @return Statement |
||||||
| 698 | */ |
||||||
| 699 | public function dropPartition(string $dataBaseTableName, string $partition_id) |
||||||
| 700 | { |
||||||
| 701 | |||||||
| 702 | $partition_id = trim($partition_id, '\''); |
||||||
| 703 | $this->getSettings()->set('replication_alter_partitions_sync', 2); |
||||||
| 704 | $state = $this->write('ALTER TABLE {dataBaseTableName} DROP PARTITION :partion_id', |
||||||
| 705 | [ |
||||||
| 706 | 'dataBaseTableName' => $dataBaseTableName, |
||||||
| 707 | 'partion_id' => $partition_id, |
||||||
| 708 | ]); |
||||||
| 709 | |||||||
| 710 | return $state; |
||||||
| 711 | } |
||||||
| 712 | |||||||
| 713 | /** |
||||||
| 714 | * dropOldPartitions by day_ago |
||||||
| 715 | * @deprecated |
||||||
| 716 | * |
||||||
| 717 | * @return array |
||||||
| 718 | * @throws Exception\TransportException |
||||||
| 719 | * @throws \Exception |
||||||
| 720 | */ |
||||||
| 721 | public function dropOldPartitions(string $table_name, int $days_ago, int $count_partitons_per_one = 100) |
||||||
| 722 | { |
||||||
| 723 | $days_ago = strtotime(date('Y-m-d 00:00:00', strtotime('-' . $days_ago . ' day'))); |
||||||
| 724 | |||||||
| 725 | $drop = []; |
||||||
| 726 | $list_patitions = $this->partitions($table_name, $count_partitons_per_one); |
||||||
| 727 | |||||||
| 728 | foreach ($list_patitions as $partion_id => $partition) { |
||||||
| 729 | if (stripos($partition['engine'], 'mergetree') === false) { |
||||||
| 730 | continue; |
||||||
| 731 | } |
||||||
| 732 | |||||||
| 733 | // $min_date = strtotime($partition['min_date']); |
||||||
| 734 | $max_date = strtotime($partition['max_date']); |
||||||
| 735 | |||||||
| 736 | if ($max_date < $days_ago) { |
||||||
| 737 | $drop[] = $partition['partition']; |
||||||
| 738 | } |
||||||
| 739 | } |
||||||
| 740 | |||||||
| 741 | $result = []; |
||||||
| 742 | foreach ($drop as $partition_id) { |
||||||
| 743 | $result[$partition_id] = $this->dropPartition($table_name, $partition_id); |
||||||
| 744 | } |
||||||
| 745 | |||||||
| 746 | return $result; |
||||||
| 747 | } |
||||||
| 748 | |||||||
| 749 | /** |
||||||
| 750 | * Truncate ( drop all partitions ) |
||||||
| 751 | * @deprecated |
||||||
| 752 | * @return array |
||||||
| 753 | */ |
||||||
| 754 | public function truncateTable(string $tableName) |
||||||
| 755 | { |
||||||
| 756 | $partions = $this->partitions($tableName); |
||||||
| 757 | $out = []; |
||||||
| 758 | foreach ($partions as $part_key => $part) { |
||||||
| 759 | $part_id = $part['partition']; |
||||||
| 760 | $out[$part_id] = $this->dropPartition($tableName, $part_id); |
||||||
| 761 | } |
||||||
| 762 | |||||||
| 763 | return $out; |
||||||
| 764 | } |
||||||
| 765 | |||||||
| 766 | /** |
||||||
| 767 | * Returns the server's uptime in seconds. |
||||||
| 768 | * |
||||||
| 769 | * @return int |
||||||
| 770 | * @throws Exception\TransportException |
||||||
| 771 | */ |
||||||
| 772 | 1 | public function getServerUptime() |
|||||
| 773 | { |
||||||
| 774 | 1 | return $this->select('SELECT uptime() as uptime')->fetchOne('uptime'); |
|||||
| 775 | } |
||||||
| 776 | |||||||
| 777 | /** |
||||||
| 778 | * Returns string with the server version. |
||||||
| 779 | */ |
||||||
| 780 | 1 | public function getServerVersion() : string |
|||||
| 781 | { |
||||||
| 782 | 1 | return (string) $this->select('SELECT version() as version')->fetchOne('version'); |
|||||
| 783 | } |
||||||
| 784 | |||||||
| 785 | /** |
||||||
| 786 | * Read system.settings table |
||||||
| 787 | * |
||||||
| 788 | * @return mixed[][] |
||||||
| 789 | */ |
||||||
| 790 | 1 | public function getServerSystemSettings(string $like = '') |
|||||
| 791 | { |
||||||
| 792 | 1 | $l = []; |
|||||
| 793 | 1 | $list = $this->select('SELECT * FROM system.settings' . ($like ? ' WHERE name LIKE :like' : ''), |
|||||
| 794 | 1 | ['like' => '%' . $like . '%'])->rows(); |
|||||
| 795 | 1 | foreach ($list as $row) { |
|||||
| 796 | 1 | if (isset($row['name'])) { |
|||||
| 797 | 1 | $n = $row['name']; |
|||||
| 798 | 1 | unset($row['name']); |
|||||
| 799 | 1 | $l[$n] = $row; |
|||||
| 800 | } |
||||||
| 801 | } |
||||||
| 802 | |||||||
| 803 | 1 | return $l; |
|||||
| 804 | } |
||||||
| 805 | } |
||||||
| 806 |