| 1 | <?php |
||||||
| 2 | |||||||
| 3 | declare(strict_types=1); |
||||||
| 4 | |||||||
| 5 | namespace ClickHouseDB; |
||||||
| 6 | |||||||
| 7 | use ClickHouseDB\Exception\QueryException; |
||||||
| 8 | use ClickHouseDB\Query\Degeneration; |
||||||
| 9 | use ClickHouseDB\Query\Degeneration\Bindings; |
||||||
| 10 | use ClickHouseDB\Query\Degeneration\Conditions; |
||||||
| 11 | use ClickHouseDB\Query\WhereInFile; |
||||||
| 12 | use ClickHouseDB\Query\WriteToFile; |
||||||
| 13 | use ClickHouseDB\Quote\FormatLine; |
||||||
| 14 | use ClickHouseDB\Transport\Http; |
||||||
| 15 | use ClickHouseDB\Transport\Stream; |
||||||
| 16 | use function array_flip; |
||||||
| 17 | use function array_keys; |
||||||
| 18 | use function array_rand; |
||||||
| 19 | use function array_values; |
||||||
| 20 | use function count; |
||||||
| 21 | use function date; |
||||||
| 22 | use function implode; |
||||||
| 23 | use function in_array; |
||||||
| 24 | use function is_array; |
||||||
| 25 | use function is_callable; |
||||||
| 26 | use function is_file; |
||||||
| 27 | use function is_readable; |
||||||
| 28 | use function is_string; |
||||||
| 29 | use function sprintf; |
||||||
| 30 | use function stripos; |
||||||
| 31 | use function strtotime; |
||||||
| 32 | use function trim; |
||||||
| 33 | |||||||
| 34 | class Client |
||||||
| 35 | { |
||||||
| 36 | private const DEFAULT_USERNAME = 'default'; |
||||||
| 37 | private const DEFAULT_PASSWORD = ''; |
||||||
| 38 | private const DEFAULT_PORT = 8123; |
||||||
| 39 | private const DEFAULT_HOST = '127.0.0.1'; |
||||||
| 40 | private const DEFAULT_DATABASE = 'default'; |
||||||
| 41 | private const SUPPORTED_FORMATS = ['TabSeparated', 'TabSeparatedWithNames', 'CSV', 'CSVWithNames', 'JSONEachRow']; |
||||||
| 42 | |||||||
| 43 | /** @var Http */ |
||||||
| 44 | private $transport; |
||||||
| 45 | |||||||
| 46 | /** @var string */ |
||||||
| 47 | private $username; |
||||||
| 48 | |||||||
| 49 | /** @var string */ |
||||||
| 50 | private $password; |
||||||
| 51 | |||||||
| 52 | /** @var string */ |
||||||
| 53 | private $host; |
||||||
| 54 | |||||||
| 55 | /** @var string */ |
||||||
| 56 | private $port; |
||||||
| 57 | |||||||
| 58 | /** @var string */ |
||||||
| 59 | private $database; |
||||||
| 60 | |||||||
| 61 | /** |
||||||
| 62 | * @param mixed[] $connectParams |
||||||
| 63 | * @param mixed[] $settings |
||||||
| 64 | */ |
||||||
| 65 | 63 | public function __construct(array $connectParams, array $settings = []) |
|||||
| 66 | { |
||||||
| 67 | 63 | $this->username = $connectParams['username'] ?? self::DEFAULT_USERNAME; |
|||||
| 68 | 63 | $this->password = $connectParams['password'] ?? self::DEFAULT_PASSWORD; |
|||||
| 69 | 63 | $this->port = $connectParams['port'] ?? self::DEFAULT_PORT; |
|||||
| 70 | 63 | $this->host = $connectParams['host'] ?? self::DEFAULT_HOST; |
|||||
| 71 | 63 | $this->database = $connectParams['database'] ?? self::DEFAULT_DATABASE; |
|||||
| 72 | |||||||
| 73 | 63 | $this->transport = new Http( |
|||||
| 74 | 63 | $this->host, |
|||||
| 75 | 63 | $this->port, |
|||||
| 76 | 63 | $this->username, |
|||||
| 77 | 63 | $this->password, |
|||||
| 78 | 63 | $this->database |
|||||
| 79 | ); |
||||||
| 80 | |||||||
| 81 | 63 | $this->transport->addQueryDegeneration(new Bindings()); |
|||||
| 82 | |||||||
| 83 | // apply settings to transport class |
||||||
| 84 | 63 | if (! empty($settings)) { |
|||||
| 85 | 1 | $this->getSettings()->apply($settings); |
|||||
| 86 | } |
||||||
| 87 | |||||||
| 88 | 63 | if (isset($connectParams['https'])) { |
|||||
| 89 | $this->setHttps($connectParams['https']); |
||||||
| 90 | } |
||||||
| 91 | |||||||
| 92 | 63 | $this->setHttpCompression(true); |
|||||
| 93 | 63 | } |
|||||
| 94 | |||||||
| 95 | /** |
||||||
| 96 | * Clear Degeneration processing request [template ] |
||||||
| 97 | */ |
||||||
| 98 | 1 | public function cleanQueryDegeneration() : void |
|||||
| 99 | { |
||||||
| 100 | 1 | $this->transport->cleanQueryDegeneration(); |
|||||
| 101 | 1 | } |
|||||
| 102 | |||||||
| 103 | /** |
||||||
| 104 | * Degeneration processing |
||||||
| 105 | */ |
||||||
| 106 | public function addQueryDegeneration(Degeneration $degeneration) : void |
||||||
| 107 | { |
||||||
| 108 | $this->transport->addQueryDegeneration($degeneration); |
||||||
| 109 | } |
||||||
| 110 | |||||||
| 111 | /** |
||||||
| 112 | * Add Conditions Degeneration to query processing |
||||||
| 113 | */ |
||||||
| 114 | 1 | public function enableQueryConditions() : void |
|||||
| 115 | { |
||||||
| 116 | 1 | $this->transport->addQueryDegeneration(new Conditions()); |
|||||
| 117 | 1 | } |
|||||
| 118 | |||||||
| 119 | 2 | public function setTimeout(float $seconds) : void |
|||||
| 120 | { |
||||||
| 121 | 2 | $this->transport->setTimeout($seconds); |
|||||
| 122 | 2 | } |
|||||
| 123 | |||||||
| 124 | 2 | public function setConnectTimeout(float $seconds) : void |
|||||
| 125 | { |
||||||
| 126 | 2 | $this->transport->setConnectTimeout($seconds); |
|||||
| 127 | 2 | } |
|||||
| 128 | |||||||
| 129 | /** |
||||||
| 130 | * @return string |
||||||
| 131 | */ |
||||||
| 132 | public function getHost() |
||||||
| 133 | { |
||||||
| 134 | return $this->host; |
||||||
| 135 | } |
||||||
| 136 | |||||||
| 137 | /** |
||||||
| 138 | * Set connection host |
||||||
| 139 | * |
||||||
| 140 | * @param string|string[] $host |
||||||
| 141 | */ |
||||||
| 142 | public function setHost($host) |
||||||
| 143 | { |
||||||
| 144 | if (is_array($host)) { |
||||||
| 145 | $host = array_rand(array_flip($host)); |
||||||
| 146 | } |
||||||
| 147 | |||||||
| 148 | $this->host = $host; |
||||||
| 149 | $this->transport->setHost($host); |
||||||
| 150 | } |
||||||
| 151 | |||||||
| 152 | public function getPassword() : string |
||||||
| 153 | { |
||||||
| 154 | return $this->password; |
||||||
| 155 | } |
||||||
| 156 | |||||||
| 157 | public function getPort() : int |
||||||
| 158 | { |
||||||
| 159 | return $this->port; |
||||||
|
0 ignored issues
–
show
Bug
Best Practice
introduced
by
Loading history...
|
|||||||
| 160 | } |
||||||
| 161 | |||||||
| 162 | public function getUsername() : string |
||||||
| 163 | { |
||||||
| 164 | return $this->username; |
||||||
| 165 | } |
||||||
| 166 | |||||||
| 167 | 63 | public function setDatabase(string $database) : self |
|||||
| 168 | { |
||||||
| 169 | 63 | $this->database = $database; |
|||||
| 170 | 63 | $this->transport->setDatabase($database); |
|||||
| 171 | |||||||
| 172 | 63 | return $this; |
|||||
| 173 | } |
||||||
| 174 | |||||||
| 175 | 2 | public function getTransport() : Http |
|||||
| 176 | { |
||||||
| 177 | 2 | return $this->transport; |
|||||
| 178 | } |
||||||
| 179 | |||||||
| 180 | /** |
||||||
| 181 | * @return mixed |
||||||
| 182 | */ |
||||||
| 183 | public function verbose() |
||||||
| 184 | { |
||||||
| 185 | return $this->transport->setVerbose(); |
||||||
| 186 | } |
||||||
| 187 | |||||||
| 188 | 63 | public function getSettings() : Settings |
|||||
| 189 | { |
||||||
| 190 | 63 | return $this->transport->getSettings(); |
|||||
| 191 | } |
||||||
| 192 | |||||||
| 193 | /** |
||||||
| 194 | * @return static |
||||||
| 195 | */ |
||||||
| 196 | 2 | public function useSession(bool $useSessionId = false) |
|||||
| 197 | { |
||||||
| 198 | 2 | if (! $this->getSettings()->getSessionId()) { |
|||||
| 199 | 2 | if (! $useSessionId) { |
|||||
| 200 | 2 | $this->getSettings()->makeSessionId(); |
|||||
| 201 | } else { |
||||||
| 202 | $this->getSettings()->session_id($useSessionId); |
||||||
| 203 | } |
||||||
| 204 | } |
||||||
| 205 | |||||||
| 206 | 2 | return $this; |
|||||
| 207 | } |
||||||
| 208 | |||||||
| 209 | /** |
||||||
| 210 | * @return mixed |
||||||
| 211 | */ |
||||||
| 212 | 2 | public function getSession() |
|||||
| 213 | { |
||||||
| 214 | 2 | return $this->getSettings()->getSessionId(); |
|||||
| 215 | } |
||||||
| 216 | |||||||
| 217 | /** |
||||||
| 218 | * Query CREATE/DROP |
||||||
| 219 | * |
||||||
| 220 | * @param mixed[] $bindings |
||||||
| 221 | * @return Statement |
||||||
| 222 | */ |
||||||
| 223 | 63 | public function write(string $sql, array $bindings = [], bool $exception = true) |
|||||
| 224 | { |
||||||
| 225 | 63 | return $this->transport->write($sql, $bindings, $exception); |
|||||
| 226 | } |
||||||
| 227 | |||||||
| 228 | /** |
||||||
| 229 | * Write to system.query_log |
||||||
| 230 | * |
||||||
| 231 | * @return static |
||||||
| 232 | */ |
||||||
| 233 | public function enableLogQueries(bool $flag = true) |
||||||
| 234 | { |
||||||
| 235 | $this->getSettings()->set('log_queries', (int) $flag); |
||||||
| 236 | |||||||
| 237 | return $this; |
||||||
| 238 | } |
||||||
| 239 | |||||||
| 240 | /** |
||||||
| 241 | * Compress the result if the HTTP client said that it understands data compressed with gzip or deflate |
||||||
| 242 | */ |
||||||
| 243 | 63 | public function setHttpCompression(bool $enable) : self |
|||||
| 244 | { |
||||||
| 245 | 63 | $this->getSettings()->setHttpCompression($enable); |
|||||
| 246 | |||||||
| 247 | 63 | return $this; |
|||||
| 248 | } |
||||||
| 249 | |||||||
| 250 | /** |
||||||
| 251 | * Enable / Disable HTTPS |
||||||
| 252 | * |
||||||
| 253 | * @return static |
||||||
| 254 | */ |
||||||
| 255 | 1 | public function setHttps(bool $flag) |
|||||
| 256 | { |
||||||
| 257 | 1 | $this->transport->setHttps($flag); |
|||||
| 258 | |||||||
| 259 | 1 | return $this; |
|||||
| 260 | } |
||||||
| 261 | |||||||
| 262 | /** |
||||||
| 263 | * Read extremes of the result columns. They can be output in JSON-formats. |
||||||
| 264 | * |
||||||
| 265 | * @return static |
||||||
| 266 | */ |
||||||
| 267 | 2 | public function enableExtremes(bool $flag = true) |
|||||
| 268 | { |
||||||
| 269 | 2 | $this->getSettings()->set('extremes', (int) $flag); |
|||||
| 270 | |||||||
| 271 | 2 | return $this; |
|||||
| 272 | } |
||||||
| 273 | |||||||
| 274 | /** |
||||||
| 275 | * Ping server |
||||||
| 276 | * |
||||||
| 277 | * @return bool |
||||||
| 278 | */ |
||||||
| 279 | 39 | public function ping() |
|||||
| 280 | { |
||||||
| 281 | 39 | return $this->transport->ping(); |
|||||
| 282 | } |
||||||
| 283 | |||||||
| 284 | /** |
||||||
| 285 | * @param mixed[] $bindings |
||||||
| 286 | * @return Statement |
||||||
| 287 | */ |
||||||
| 288 | 29 | public function select( |
|||||
| 289 | string $sql, |
||||||
| 290 | array $bindings = [], |
||||||
| 291 | WhereInFile $whereInFile = null, |
||||||
| 292 | WriteToFile $writeToFile = null |
||||||
| 293 | ) { |
||||||
| 294 | 29 | return $this->transport->select($sql, $bindings, $whereInFile, $writeToFile); |
|||||
| 295 | } |
||||||
| 296 | |||||||
| 297 | /** |
||||||
| 298 | * prepare select |
||||||
| 299 | * |
||||||
| 300 | * @param mixed[] $bindings |
||||||
| 301 | * @return Statement |
||||||
| 302 | */ |
||||||
| 303 | 5 | public function selectAsync( |
|||||
| 304 | string $sql, |
||||||
| 305 | array $bindings = [], |
||||||
| 306 | WhereInFile $whereInFile = null, |
||||||
| 307 | WriteToFile $writeToFile = null |
||||||
| 308 | ) { |
||||||
| 309 | 5 | return $this->transport->selectAsync($sql, $bindings, $whereInFile, $writeToFile); |
|||||
| 310 | } |
||||||
| 311 | |||||||
| 312 | /** |
||||||
| 313 | * @return bool |
||||||
| 314 | */ |
||||||
| 315 | 10 | public function executeAsync() |
|||||
| 316 | { |
||||||
| 317 | 10 | return $this->transport->executeAsync(); |
|||||
| 318 | } |
||||||
| 319 | |||||||
| 320 | /** |
||||||
| 321 | * set progressFunction |
||||||
| 322 | */ |
||||||
| 323 | 1 | public function progressFunction(callable $callback) |
|||||
| 324 | { |
||||||
| 325 | 1 | if (! is_callable($callback)) { |
|||||
| 326 | throw new \InvalidArgumentException('Not is_callable progressFunction'); |
||||||
| 327 | } |
||||||
| 328 | |||||||
| 329 | 1 | if (! $this->getSettings()->isSet('send_progress_in_http_headers')) { |
|||||
| 330 | 1 | $this->getSettings()->set('send_progress_in_http_headers', 1); |
|||||
| 331 | } |
||||||
| 332 | 1 | if (! $this->getSettings()->isSet('http_headers_progress_interval_ms')) { |
|||||
| 333 | 1 | $this->getSettings()->set('http_headers_progress_interval_ms', 100); |
|||||
| 334 | } |
||||||
| 335 | |||||||
| 336 | 1 | $this->transport->setProgressFunction($callback); |
|||||
| 337 | 1 | } |
|||||
| 338 | |||||||
| 339 | /** |
||||||
| 340 | * SHOW PROCESSLIST |
||||||
| 341 | * |
||||||
| 342 | * @return array |
||||||
| 343 | */ |
||||||
| 344 | public function showProcesslist() |
||||||
| 345 | { |
||||||
| 346 | return $this->select('SHOW PROCESSLIST')->rows(); |
||||||
| 347 | } |
||||||
| 348 | |||||||
| 349 | /** |
||||||
| 350 | * Get the number of simultaneous/Pending requests |
||||||
| 351 | * |
||||||
| 352 | * @return int |
||||||
| 353 | */ |
||||||
| 354 | 12 | public function getCountPendingQueue() |
|||||
| 355 | { |
||||||
| 356 | 12 | return $this->transport->getCountPendingQueue(); |
|||||
| 357 | } |
||||||
| 358 | |||||||
| 359 | /** |
||||||
| 360 | * @param mixed[][] $values |
||||||
| 361 | * @param string[] $columns |
||||||
| 362 | */ |
||||||
| 363 | 9 | public function insert(string $table, array $values, array $columns = []) : Statement |
|||||
| 364 | { |
||||||
| 365 | 9 | if (empty($values)) { |
|||||
| 366 | 1 | throw QueryException::cannotInsertEmptyValues(); |
|||||
| 367 | } |
||||||
| 368 | |||||||
| 369 | 8 | if (stripos($table, '`') === false && stripos($table, '.') === false) { |
|||||
| 370 | 5 | $table = '`' . $table . '`'; //quote table name for dot names |
|||||
| 371 | } |
||||||
| 372 | 8 | $sql = 'INSERT INTO ' . $table; |
|||||
| 373 | |||||||
| 374 | 8 | if (count($columns) !== 0) { |
|||||
| 375 | 7 | $sql .= ' (`' . implode('`,`', $columns) . '`) '; |
|||||
| 376 | } |
||||||
| 377 | |||||||
| 378 | 8 | $sql .= ' VALUES '; |
|||||
| 379 | |||||||
| 380 | 8 | foreach ($values as $row) { |
|||||
| 381 | 8 | $sql .= ' (' . FormatLine::Insert($row) . '), '; |
|||||
| 382 | } |
||||||
| 383 | 8 | $sql = trim($sql, ', '); |
|||||
| 384 | |||||||
| 385 | 8 | return $this->transport->write($sql); |
|||||
| 386 | } |
||||||
| 387 | |||||||
| 388 | /** |
||||||
| 389 | * * Prepares the values to insert from the associative array. |
||||||
| 390 | * * There may be one or more lines inserted, but then the keys inside the array list must match (including in the sequence) |
||||||
| 391 | * * |
||||||
| 392 | * * @param mixed[] $values - array column_name => value (if we insert one row) or array list column_name => value if we insert many lines |
||||||
| 393 | * * @return mixed[][] - list of arrays - 0 => fields, 1 => list of value arrays for insertion |
||||||
| 394 | * */ |
||||||
| 395 | 3 | public function prepareInsertAssocBulk(array $values) |
|||||
| 396 | { |
||||||
| 397 | 3 | if (isset($values[0]) && is_array($values[0])) { //случай, когда много строк вставляется |
|||||
| 398 | 2 | $preparedFields = array_keys($values[0]); |
|||||
| 399 | 2 | $preparedValues = []; |
|||||
| 400 | 2 | foreach ($values as $idx => $row) { |
|||||
| 401 | 2 | $_fields = array_keys($row); |
|||||
| 402 | 2 | if ($_fields !== $preparedFields) { |
|||||
| 403 | 1 | throw new QueryException( |
|||||
| 404 | 1 | sprintf( |
|||||
| 405 | 1 | 'Fields not match: %s and %s on element %s', |
|||||
| 406 | 1 | implode(',', $_fields), |
|||||
| 407 | 1 | implode(',', $preparedFields), |
|||||
| 408 | 1 | $idx |
|||||
| 409 | ) |
||||||
| 410 | ); |
||||||
| 411 | } |
||||||
| 412 | 2 | $preparedValues[] = array_values($row); |
|||||
| 413 | } |
||||||
| 414 | } else { |
||||||
| 415 | 1 | $preparedFields = array_keys($values); |
|||||
| 416 | 1 | $preparedValues = [array_values($values)]; |
|||||
| 417 | } |
||||||
| 418 | |||||||
| 419 | 2 | return [$preparedFields, $preparedValues]; |
|||||
| 420 | } |
||||||
| 421 | |||||||
| 422 | /** |
||||||
| 423 | * Inserts one or more rows from an associative array. |
||||||
| 424 | * If there is a discrepancy between the keys of the value arrays (or their order) - throws an exception. |
||||||
| 425 | * |
||||||
| 426 | * @param mixed[] $values - array column_name => value (if we insert one row) or array list column_name => value if we insert many lines |
||||||
| 427 | * @return Statement |
||||||
| 428 | */ |
||||||
| 429 | public function insertAssocBulk(string $tableName, array $values) |
||||||
| 430 | { |
||||||
| 431 | list($columns, $vals) = $this->prepareInsertAssocBulk($values); |
||||||
| 432 | |||||||
| 433 | return $this->insert($tableName, $vals, $columns); |
||||||
| 434 | } |
||||||
| 435 | |||||||
| 436 | /** |
||||||
| 437 | * insert TabSeparated files |
||||||
| 438 | * |
||||||
| 439 | * @param string|string[] $fileNames |
||||||
| 440 | * @param string[] $columns |
||||||
| 441 | * @return mixed |
||||||
| 442 | */ |
||||||
| 443 | 1 | public function insertBatchTSVFiles(string $tableName, $fileNames, array $columns = []) |
|||||
| 444 | { |
||||||
| 445 | 1 | return $this->insertBatchFiles($tableName, $fileNames, $columns, 'TabSeparated'); |
|||||
| 446 | } |
||||||
| 447 | |||||||
| 448 | /** |
||||||
| 449 | * insert Batch Files |
||||||
| 450 | * |
||||||
| 451 | * @param string|string[] $fileNames |
||||||
| 452 | * @param string[] $columns |
||||||
| 453 | * @param string $format ['TabSeparated','TabSeparatedWithNames','CSV','CSVWithNames'] |
||||||
| 454 | * @return Statement[] |
||||||
| 455 | * @throws Exception\TransportException |
||||||
| 456 | */ |
||||||
| 457 | 8 | public function insertBatchFiles(string $tableName, $fileNames, array $columns = [], string $format = 'CSV') |
|||||
| 458 | { |
||||||
| 459 | 8 | if (is_string($fileNames)) { |
|||||
| 460 | $fileNames = [$fileNames]; |
||||||
| 461 | } |
||||||
| 462 | 8 | if ($this->getCountPendingQueue() > 0) { |
|||||
| 463 | throw new QueryException('Queue must be empty, before insertBatch, need executeAsync'); |
||||||
| 464 | } |
||||||
| 465 | |||||||
| 466 | 8 | if (! in_array($format, self::SUPPORTED_FORMATS, true)) { |
|||||
| 467 | throw new QueryException('Format not support in insertBatchFiles'); |
||||||
| 468 | } |
||||||
| 469 | |||||||
| 470 | 8 | $result = []; |
|||||
| 471 | |||||||
| 472 | 8 | foreach ($fileNames as $fileName) { |
|||||
| 473 | 8 | if (! is_file($fileName) || ! is_readable($fileName)) { |
|||||
| 474 | throw new QueryException('Cant read file: ' . $fileName . ' ' . (is_file($fileName) ? '' : ' is not file')); |
||||||
| 475 | } |
||||||
| 476 | |||||||
| 477 | 8 | if (empty($columns)) { |
|||||
| 478 | $sql = 'INSERT INTO ' . $tableName . ' FORMAT ' . $format; |
||||||
| 479 | } else { |
||||||
| 480 | 8 | $sql = 'INSERT INTO ' . $tableName . ' ( ' . implode(',', $columns) . ' ) FORMAT ' . $format; |
|||||
| 481 | } |
||||||
| 482 | 8 | $result[$fileName] = $this->transport->writeAsyncCSV($sql, $fileName); |
|||||
| 483 | } |
||||||
| 484 | |||||||
| 485 | // exec |
||||||
| 486 | 8 | $this->executeAsync(); |
|||||
| 487 | |||||||
| 488 | // fetch resutl |
||||||
| 489 | 8 | foreach ($fileNames as $fileName) { |
|||||
| 490 | 8 | if (! $result[$fileName]->isError()) { |
|||||
| 491 | 6 | continue; |
|||||
| 492 | } |
||||||
| 493 | |||||||
| 494 | 2 | $result[$fileName]->error(); |
|||||
| 495 | } |
||||||
| 496 | |||||||
| 497 | 6 | return $result; |
|||||
| 498 | } |
||||||
| 499 | |||||||
| 500 | /** |
||||||
| 501 | * insert Batch Stream |
||||||
| 502 | * |
||||||
| 503 | * @param string[] $columns |
||||||
| 504 | * @param string $format ['TabSeparated','TabSeparatedWithNames','CSV','CSVWithNames'] |
||||||
| 505 | * @return Transport\CurlerRequest |
||||||
| 506 | */ |
||||||
| 507 | 2 | public function insertBatchStream(string $tableName, array $columns = [], string $format = 'CSV') |
|||||
| 508 | { |
||||||
| 509 | 2 | if ($this->getCountPendingQueue() > 0) { |
|||||
| 510 | throw new QueryException('Queue must be empty, before insertBatch, need executeAsync'); |
||||||
| 511 | } |
||||||
| 512 | |||||||
| 513 | 2 | if (! in_array($format, self::SUPPORTED_FORMATS, true)) { |
|||||
| 514 | throw new QueryException('Format not support in insertBatchFiles'); |
||||||
| 515 | } |
||||||
| 516 | |||||||
| 517 | 2 | if (empty($columns)) { |
|||||
| 518 | $sql = 'INSERT INTO ' . $tableName . ' FORMAT ' . $format; |
||||||
| 519 | } else { |
||||||
| 520 | 2 | $sql = 'INSERT INTO ' . $tableName . ' ( ' . implode(',', $columns) . ' ) FORMAT ' . $format; |
|||||
| 521 | } |
||||||
| 522 | |||||||
| 523 | 2 | return $this->transport->writeStreamData($sql); |
|||||
| 524 | } |
||||||
| 525 | |||||||
| 526 | /** |
||||||
| 527 | * stream Write |
||||||
| 528 | * |
||||||
| 529 | * @param string[] $bind |
||||||
| 530 | * @return Statement |
||||||
| 531 | * @throws Exception\TransportException |
||||||
| 532 | */ |
||||||
| 533 | 1 | public function streamWrite(Stream $stream, string $sql, array $bind = []) |
|||||
| 534 | { |
||||||
| 535 | 1 | if ($this->getCountPendingQueue() > 0) { |
|||||
| 536 | throw new QueryException('Queue must be empty, before streamWrite'); |
||||||
| 537 | } |
||||||
| 538 | |||||||
| 539 | 1 | return $this->transport->streamWrite($stream, $sql, $bind); |
|||||
| 540 | } |
||||||
| 541 | |||||||
| 542 | /** |
||||||
| 543 | * stream Read |
||||||
| 544 | * |
||||||
| 545 | * @param string[] $bind |
||||||
| 546 | * @return Statement |
||||||
| 547 | */ |
||||||
| 548 | 1 | public function streamRead(Stream $streamRead, string $sql, array $bind = []) |
|||||
| 549 | { |
||||||
| 550 | 1 | if ($this->getCountPendingQueue() > 0) { |
|||||
| 551 | throw new QueryException('Queue must be empty, before streamWrite'); |
||||||
| 552 | } |
||||||
| 553 | |||||||
| 554 | 1 | return $this->transport->streamRead($streamRead, $sql, $bind); |
|||||
| 555 | } |
||||||
| 556 | |||||||
| 557 | /** |
||||||
| 558 | * show databases |
||||||
| 559 | * |
||||||
| 560 | * @return array |
||||||
| 561 | */ |
||||||
| 562 | public function showDatabases() |
||||||
| 563 | { |
||||||
| 564 | return $this->select('show databases')->rows(); |
||||||
| 565 | } |
||||||
| 566 | |||||||
| 567 | /** |
||||||
| 568 | * Size of database |
||||||
| 569 | * |
||||||
| 570 | * @return mixed|null |
||||||
| 571 | */ |
||||||
| 572 | public function databaseSize() |
||||||
| 573 | { |
||||||
| 574 | $database = $this->transport->getDatabase(); |
||||||
|
0 ignored issues
–
show
The method
getDatabase() does not exist on ClickHouseDB\Transport\Http.
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces. This is most likely a typographical error or the method has been renamed. Loading history...
|
|||||||
| 575 | |||||||
| 576 | return $this->select( |
||||||
| 577 | ' |
||||||
| 578 | SELECT database,formatReadableSize(sum(bytes)) as size |
||||||
| 579 | FROM system.parts |
||||||
| 580 | WHERE active AND database=:database |
||||||
| 581 | GROUP BY database |
||||||
| 582 | ', |
||||||
| 583 | ['database' => $database] |
||||||
| 584 | )->fetchOne(); |
||||||
| 585 | } |
||||||
| 586 | |||||||
| 587 | /** |
||||||
| 588 | * SHOW TABLES |
||||||
| 589 | * |
||||||
| 590 | * @return mixed[] |
||||||
| 591 | */ |
||||||
| 592 | 1 | public function showTables() |
|||||
| 593 | { |
||||||
| 594 | 1 | return $this->select('SHOW TABLES')->rowsAsTree('name'); |
|||||
| 595 | } |
||||||
| 596 | |||||||
| 597 | /** |
||||||
| 598 | * statement = SHOW CREATE TABLE |
||||||
| 599 | * |
||||||
| 600 | * @return mixed |
||||||
| 601 | */ |
||||||
| 602 | public function showCreateTable(string $table) |
||||||
| 603 | { |
||||||
| 604 | return $this->select('SHOW CREATE TABLE ' . $table)->fetchOne('statement'); |
||||||
| 605 | } |
||||||
| 606 | |||||||
| 607 | /** |
||||||
| 608 | * Size of tables |
||||||
| 609 | * |
||||||
| 610 | * @return mixed |
||||||
| 611 | */ |
||||||
| 612 | 1 | public function tableSize(string $tableName) |
|||||
| 613 | { |
||||||
| 614 | 1 | $tables = $this->tablesSize(); |
|||||
| 615 | |||||||
| 616 | 1 | if (isset($tables[$tableName])) { |
|||||
| 617 | 1 | return $tables[$tableName]; |
|||||
| 618 | } |
||||||
| 619 | |||||||
| 620 | return null; |
||||||
| 621 | } |
||||||
| 622 | |||||||
| 623 | /** |
||||||
| 624 | * Tables sizes |
||||||
| 625 | * |
||||||
| 626 | * @param bool $flatList |
||||||
| 627 | * @return mixed[][] |
||||||
| 628 | */ |
||||||
| 629 | 1 | public function tablesSize($flatList = false) |
|||||
| 630 | { |
||||||
| 631 | 1 | $result = $this->select(' |
|||||
| 632 | SELECT name as table,database, |
||||||
| 633 | max(sizebytes) as sizebytes, |
||||||
| 634 | max(size) as size, |
||||||
| 635 | min(min_date) as min_date, |
||||||
| 636 | max(max_date) as max_date |
||||||
| 637 | FROM system.tables |
||||||
| 638 | ANY LEFT JOIN |
||||||
| 639 | ( |
||||||
| 640 | SELECT table,database, |
||||||
| 641 | formatReadableSize(sum(bytes)) as size, |
||||||
| 642 | sum(bytes) as sizebytes, |
||||||
| 643 | min(min_date) as min_date, |
||||||
| 644 | max(max_date) as max_date |
||||||
| 645 | FROM system.parts |
||||||
| 646 | WHERE active AND database=:database |
||||||
| 647 | GROUP BY table,database |
||||||
| 648 | ) USING ( table,database ) |
||||||
| 649 | WHERE database=:database |
||||||
| 650 | GROUP BY table,database |
||||||
| 651 | ', |
||||||
| 652 | 1 | ['database' => $this->database]); |
|||||
| 653 | |||||||
| 654 | 1 | if ($flatList) { |
|||||
| 655 | return $result->rows(); |
||||||
| 656 | } |
||||||
| 657 | |||||||
| 658 | 1 | return $result->rowsAsTree('table'); |
|||||
| 659 | } |
||||||
| 660 | |||||||
| 661 | /** |
||||||
| 662 | * isExists |
||||||
| 663 | * |
||||||
| 664 | * @return array |
||||||
| 665 | */ |
||||||
| 666 | public function tableExists(string $database, string $table) |
||||||
| 667 | { |
||||||
| 668 | return $this->select( |
||||||
| 669 | ' |
||||||
| 670 | SELECT * |
||||||
| 671 | FROM system.tables |
||||||
| 672 | WHERE name=\'' . $table . '\' AND database=\'' . $database . '\'' |
||||||
| 673 | )->rowsAsTree('name'); |
||||||
| 674 | } |
||||||
| 675 | |||||||
| 676 | /** |
||||||
| 677 | * List of partitions |
||||||
| 678 | * |
||||||
| 679 | * @return mixed[][] |
||||||
| 680 | */ |
||||||
| 681 | public function partitions(string $table, int $limit = null, bool $active = null) |
||||||
| 682 | { |
||||||
| 683 | $database = $this->database; |
||||||
| 684 | $whereActiveClause = $active === null ? '' : sprintf(' AND active = %s', (int) $active); |
||||||
| 685 | $limitClause = $limit !== null ? ' LIMIT ' . $limit : ''; |
||||||
| 686 | |||||||
| 687 | return $this->select(<<<CLICKHOUSE |
||||||
| 688 | SELECT * |
||||||
| 689 | FROM system.parts |
||||||
| 690 | WHERE like(table,'%$table%') AND database='$database'$whereActiveClause |
||||||
| 691 | ORDER BY max_date $limitClause |
||||||
| 692 | CLICKHOUSE |
||||||
| 693 | )->rowsAsTree('name'); |
||||||
| 694 | } |
||||||
| 695 | |||||||
| 696 | /** |
||||||
| 697 | * dropPartition |
||||||
| 698 | * @deprecated |
||||||
| 699 | * @return Statement |
||||||
| 700 | */ |
||||||
| 701 | public function dropPartition(string $dataBaseTableName, string $partition_id) |
||||||
| 702 | { |
||||||
| 703 | |||||||
| 704 | $partition_id = trim($partition_id, '\''); |
||||||
| 705 | $this->getSettings()->set('replication_alter_partitions_sync', 2); |
||||||
| 706 | $state = $this->write('ALTER TABLE {dataBaseTableName} DROP PARTITION :partion_id', |
||||||
| 707 | [ |
||||||
| 708 | 'dataBaseTableName' => $dataBaseTableName, |
||||||
| 709 | 'partion_id' => $partition_id, |
||||||
| 710 | ]); |
||||||
| 711 | |||||||
| 712 | return $state; |
||||||
| 713 | } |
||||||
| 714 | |||||||
| 715 | /** |
||||||
| 716 | * dropOldPartitions by day_ago |
||||||
| 717 | * @deprecated |
||||||
| 718 | * |
||||||
| 719 | * @return array |
||||||
| 720 | * @throws Exception\TransportException |
||||||
| 721 | * @throws \Exception |
||||||
| 722 | */ |
||||||
| 723 | public function dropOldPartitions(string $table_name, int $days_ago, int $count_partitons_per_one = 100) |
||||||
| 724 | { |
||||||
| 725 | $days_ago = strtotime(date('Y-m-d 00:00:00', strtotime('-' . $days_ago . ' day'))); |
||||||
| 726 | |||||||
| 727 | $drop = []; |
||||||
| 728 | $list_patitions = $this->partitions($table_name, $count_partitons_per_one); |
||||||
| 729 | |||||||
| 730 | foreach ($list_patitions as $partion_id => $partition) { |
||||||
| 731 | if (stripos($partition['engine'], 'mergetree') === false) { |
||||||
| 732 | continue; |
||||||
| 733 | } |
||||||
| 734 | |||||||
| 735 | // $min_date = strtotime($partition['min_date']); |
||||||
| 736 | $max_date = strtotime($partition['max_date']); |
||||||
| 737 | |||||||
| 738 | if ($max_date < $days_ago) { |
||||||
| 739 | $drop[] = $partition['partition']; |
||||||
| 740 | } |
||||||
| 741 | } |
||||||
| 742 | |||||||
| 743 | $result = []; |
||||||
| 744 | foreach ($drop as $partition_id) { |
||||||
| 745 | $result[$partition_id] = $this->dropPartition($table_name, $partition_id); |
||||||
| 746 | } |
||||||
| 747 | |||||||
| 748 | return $result; |
||||||
| 749 | } |
||||||
| 750 | |||||||
| 751 | /** |
||||||
| 752 | * Truncate ( drop all partitions ) |
||||||
| 753 | * @deprecated |
||||||
| 754 | * @return array |
||||||
| 755 | */ |
||||||
| 756 | public function truncateTable(string $tableName) |
||||||
| 757 | { |
||||||
| 758 | $partions = $this->partitions($tableName); |
||||||
| 759 | $out = []; |
||||||
| 760 | foreach ($partions as $part_key => $part) { |
||||||
| 761 | $part_id = $part['partition']; |
||||||
| 762 | $out[$part_id] = $this->dropPartition($tableName, $part_id); |
||||||
| 763 | } |
||||||
| 764 | |||||||
| 765 | return $out; |
||||||
| 766 | } |
||||||
| 767 | |||||||
| 768 | /** |
||||||
| 769 | * Returns the server's uptime in seconds. |
||||||
| 770 | * |
||||||
| 771 | * @return int |
||||||
| 772 | * @throws Exception\TransportException |
||||||
| 773 | */ |
||||||
| 774 | 1 | public function getServerUptime() |
|||||
| 775 | { |
||||||
| 776 | 1 | return $this->select('SELECT uptime() as uptime')->fetchOne('uptime'); |
|||||
| 777 | } |
||||||
| 778 | |||||||
| 779 | /** |
||||||
| 780 | * Returns string with the server version. |
||||||
| 781 | */ |
||||||
| 782 | 1 | public function getServerVersion() : string |
|||||
| 783 | { |
||||||
| 784 | 1 | return (string) $this->select('SELECT version() as version')->fetchOne('version'); |
|||||
| 785 | } |
||||||
| 786 | |||||||
| 787 | /** |
||||||
| 788 | * Read system.settings table |
||||||
| 789 | * |
||||||
| 790 | * @return mixed[][] |
||||||
| 791 | */ |
||||||
| 792 | 1 | public function getServerSystemSettings(string $like = '') |
|||||
| 793 | { |
||||||
| 794 | 1 | $l = []; |
|||||
| 795 | 1 | $list = $this->select('SELECT * FROM system.settings' . ($like ? ' WHERE name LIKE :like' : ''), |
|||||
| 796 | 1 | ['like' => '%' . $like . '%'])->rows(); |
|||||
| 797 | 1 | foreach ($list as $row) { |
|||||
| 798 | 1 | if (isset($row['name'])) { |
|||||
| 799 | 1 | $n = $row['name']; |
|||||
| 800 | 1 | unset($row['name']); |
|||||
| 801 | 1 | $l[$n] = $row; |
|||||
| 802 | } |
||||||
| 803 | } |
||||||
| 804 | |||||||
| 805 | 1 | return $l; |
|||||
| 806 | } |
||||||
| 807 | } |
||||||
| 808 |