| 1 | <?php |
||||||
| 2 | |||||||
| 3 | declare(strict_types=1); |
||||||
| 4 | |||||||
| 5 | namespace ClickHouseDB; |
||||||
| 6 | |||||||
| 7 | use ClickHouseDB\Exception\QueryException; |
||||||
| 8 | use ClickHouseDB\Query\Degeneration; |
||||||
| 9 | use ClickHouseDB\Query\Degeneration\Bindings; |
||||||
| 10 | use ClickHouseDB\Query\Degeneration\Conditions; |
||||||
| 11 | use ClickHouseDB\Query\WhereInFile; |
||||||
| 12 | use ClickHouseDB\Query\WriteToFile; |
||||||
| 13 | use ClickHouseDB\Quote\FormatLine; |
||||||
| 14 | use ClickHouseDB\Transport\Http; |
||||||
| 15 | use ClickHouseDB\Transport\Stream; |
||||||
| 16 | use function array_flip; |
||||||
| 17 | use function array_keys; |
||||||
| 18 | use function array_rand; |
||||||
| 19 | use function array_values; |
||||||
| 20 | use function count; |
||||||
| 21 | use function date; |
||||||
| 22 | use function implode; |
||||||
| 23 | use function in_array; |
||||||
| 24 | use function is_array; |
||||||
| 25 | use function is_callable; |
||||||
| 26 | use function is_file; |
||||||
| 27 | use function is_readable; |
||||||
| 28 | use function is_string; |
||||||
| 29 | use function sprintf; |
||||||
| 30 | use function stripos; |
||||||
| 31 | use function strtotime; |
||||||
| 32 | use function trim; |
||||||
| 33 | |||||||
| 34 | class Client |
||||||
| 35 | { |
||||||
| 36 | private const DEFAULT_USERNAME = 'default'; |
||||||
| 37 | private const DEFAULT_PASSWORD = ''; |
||||||
| 38 | private const DEFAULT_PORT = 8123; |
||||||
| 39 | private const DEFAULT_HOST = '127.0.0.1'; |
||||||
| 40 | private const DEFAULT_DATABASE = 'default'; |
||||||
| 41 | private const SUPPORTED_FORMATS = ['TabSeparated', 'TabSeparatedWithNames', 'CSV', 'CSVWithNames', 'JSONEachRow']; |
||||||
| 42 | |||||||
| 43 | /** @var Http */ |
||||||
| 44 | private $transport; |
||||||
| 45 | |||||||
| 46 | /** @var string */ |
||||||
| 47 | private $username; |
||||||
| 48 | |||||||
| 49 | /** @var string */ |
||||||
| 50 | private $password; |
||||||
| 51 | |||||||
| 52 | /** @var string */ |
||||||
| 53 | private $host; |
||||||
| 54 | |||||||
| 55 | /** @var string */ |
||||||
| 56 | private $port; |
||||||
| 57 | |||||||
| 58 | /** @var string */ |
||||||
| 59 | private $database; |
||||||
| 60 | |||||||
| 61 | /** |
||||||
| 62 | * @param mixed[] $connectParams |
||||||
| 63 | * @param mixed[] $settings |
||||||
| 64 | */ |
||||||
| 65 | 63 | public function __construct(array $connectParams, array $settings = []) |
|||||
| 66 | { |
||||||
| 67 | 63 | $this->username = $connectParams['username'] ?? self::DEFAULT_USERNAME; |
|||||
| 68 | 63 | $this->password = $connectParams['password'] ?? self::DEFAULT_PASSWORD; |
|||||
| 69 | 63 | $this->port = $connectParams['port'] ?? self::DEFAULT_PORT; |
|||||
| 70 | 63 | $this->host = $connectParams['host'] ?? self::DEFAULT_HOST; |
|||||
| 71 | 63 | $this->database = $connectParams['database'] ?? self::DEFAULT_DATABASE; |
|||||
| 72 | |||||||
| 73 | 63 | $this->transport = new Http( |
|||||
| 74 | 63 | $this->host, |
|||||
| 75 | 63 | $this->port, |
|||||
| 76 | 63 | $this->username, |
|||||
| 77 | 63 | $this->password, |
|||||
| 78 | 63 | $this->database |
|||||
| 79 | ); |
||||||
| 80 | |||||||
| 81 | 63 | $this->transport->addQueryDegeneration(new Bindings()); |
|||||
| 82 | |||||||
| 83 | // apply settings to transport class |
||||||
| 84 | 63 | if (! empty($settings)) { |
|||||
| 85 | 1 | $this->getSettings()->apply($settings); |
|||||
| 86 | } |
||||||
| 87 | |||||||
| 88 | 63 | if (isset($connectParams['https'])) { |
|||||
| 89 | $this->setHttps($connectParams['https']); |
||||||
| 90 | } |
||||||
| 91 | |||||||
| 92 | 63 | $this->setHttpCompression(true); |
|||||
| 93 | 63 | } |
|||||
| 94 | |||||||
| 95 | /** |
||||||
| 96 | * Clear Degeneration processing request [template ] |
||||||
| 97 | */ |
||||||
| 98 | 1 | public function cleanQueryDegeneration() : void |
|||||
| 99 | { |
||||||
| 100 | 1 | $this->transport->cleanQueryDegeneration(); |
|||||
| 101 | 1 | } |
|||||
| 102 | |||||||
| 103 | /** |
||||||
| 104 | * Degeneration processing |
||||||
| 105 | */ |
||||||
| 106 | public function addQueryDegeneration(Degeneration $degeneration) : void |
||||||
| 107 | { |
||||||
| 108 | $this->transport->addQueryDegeneration($degeneration); |
||||||
| 109 | } |
||||||
| 110 | |||||||
| 111 | /** |
||||||
| 112 | * Add Conditions Degeneration to query processing |
||||||
| 113 | */ |
||||||
| 114 | 1 | public function enableQueryConditions() : void |
|||||
| 115 | { |
||||||
| 116 | 1 | $this->transport->addQueryDegeneration(new Conditions()); |
|||||
| 117 | 1 | } |
|||||
| 118 | |||||||
| 119 | 2 | public function setTimeout(float $seconds) : void |
|||||
| 120 | { |
||||||
| 121 | 2 | $this->transport->setTimeout($seconds); |
|||||
| 122 | 2 | } |
|||||
| 123 | |||||||
| 124 | 2 | public function setConnectTimeout(float $seconds) : void |
|||||
| 125 | { |
||||||
| 126 | 2 | $this->transport->setConnectTimeout($seconds); |
|||||
| 127 | 2 | } |
|||||
| 128 | |||||||
| 129 | /** |
||||||
| 130 | * @return string |
||||||
| 131 | */ |
||||||
| 132 | public function getHost() |
||||||
|
0 ignored issues
–
show
introduced
by
Loading history...
|
|||||||
| 133 | { |
||||||
| 134 | return $this->host; |
||||||
| 135 | } |
||||||
| 136 | |||||||
| 137 | /** |
||||||
| 138 | * Set connection host |
||||||
| 139 | * |
||||||
| 140 | * @param string|string[] $host |
||||||
| 141 | */ |
||||||
| 142 | public function setHost($host) |
||||||
| 143 | { |
||||||
| 144 | if (is_array($host)) { |
||||||
| 145 | $host = array_rand(array_flip($host)); |
||||||
| 146 | } |
||||||
| 147 | |||||||
| 148 | $this->host = $host; |
||||||
| 149 | $this->transport->setHost($host); |
||||||
| 150 | } |
||||||
| 151 | |||||||
| 152 | public function getPassword() : string |
||||||
| 153 | { |
||||||
| 154 | return $this->password; |
||||||
| 155 | } |
||||||
| 156 | |||||||
| 157 | public function getPort() : int |
||||||
| 158 | { |
||||||
| 159 | return $this->port; |
||||||
|
0 ignored issues
–
show
|
|||||||
| 160 | } |
||||||
| 161 | |||||||
| 162 | public function getUsername() : string |
||||||
| 163 | { |
||||||
| 164 | return $this->username; |
||||||
| 165 | } |
||||||
| 166 | |||||||
| 167 | 63 | public function setDatabase(string $database) : self |
|||||
| 168 | { |
||||||
| 169 | 63 | $this->database = $database; |
|||||
| 170 | 63 | $this->transport->setDatabase($database); |
|||||
| 171 | |||||||
| 172 | 63 | return $this; |
|||||
| 173 | } |
||||||
| 174 | |||||||
| 175 | 2 | public function getTransport() : Http |
|||||
| 176 | { |
||||||
| 177 | 2 | return $this->transport; |
|||||
| 178 | } |
||||||
| 179 | |||||||
| 180 | /** |
||||||
| 181 | * @return mixed |
||||||
| 182 | */ |
||||||
| 183 | public function verbose() |
||||||
| 184 | { |
||||||
| 185 | return $this->transport->setVerbose(); |
||||||
|
0 ignored issues
–
show
Are you sure the usage of
$this->transport->setVerbose() targeting ClickHouseDB\Transport\Http::setVerbose() seems to always return null.
This check looks for function or method calls that always return null and whose return value is used. class A
{
function getObject()
{
return null;
}
}
$a = new A();
if ($a->getObject()) {
The method The reason is most likely that a function or method is imcomplete or has been reduced for debug purposes. Loading history...
The call to
ClickHouseDB\Transport\Http::setVerbose() has too few arguments starting with flag.
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
This check compares calls to functions or methods with their respective definitions. If the call has less arguments than are defined, it raises an issue. If a function is defined several times with a different number of parameters, the check may pick up the wrong definition and report false positives. One codebase where this has been known to happen is Wordpress. Please note the @ignore annotation hint above. Loading history...
|
|||||||
| 186 | } |
||||||
| 187 | |||||||
| 188 | 63 | public function getSettings() : Settings |
|||||
| 189 | { |
||||||
| 190 | 63 | return $this->transport->getSettings(); |
|||||
| 191 | } |
||||||
| 192 | |||||||
| 193 | /** |
||||||
| 194 | * @return static |
||||||
| 195 | */ |
||||||
| 196 | 2 | public function useSession(bool $useSessionId = false) |
|||||
| 197 | { |
||||||
| 198 | 2 | if (! $this->getSettings()->getSessionId()) { |
|||||
| 199 | 2 | if (! $useSessionId) { |
|||||
| 200 | 2 | $this->getSettings()->makeSessionId(); |
|||||
| 201 | } else { |
||||||
| 202 | $this->getSettings()->session_id($useSessionId); |
||||||
| 203 | } |
||||||
| 204 | } |
||||||
| 205 | |||||||
| 206 | 2 | return $this; |
|||||
| 207 | } |
||||||
| 208 | |||||||
| 209 | /** |
||||||
| 210 | * @return mixed |
||||||
| 211 | */ |
||||||
| 212 | 2 | public function getSession() |
|||||
| 213 | { |
||||||
| 214 | 2 | return $this->getSettings()->getSessionId(); |
|||||
| 215 | } |
||||||
| 216 | |||||||
| 217 | /** |
||||||
| 218 | * Query CREATE/DROP |
||||||
| 219 | * |
||||||
| 220 | * @param mixed[] $bindings |
||||||
| 221 | * @return Statement |
||||||
| 222 | */ |
||||||
| 223 | 63 | public function write(string $sql, array $bindings = [], bool $exception = true) |
|||||
| 224 | { |
||||||
| 225 | 63 | return $this->transport->write($sql, $bindings, $exception); |
|||||
| 226 | } |
||||||
| 227 | |||||||
| 228 | /** |
||||||
| 229 | * Write to system.query_log |
||||||
| 230 | * |
||||||
| 231 | * @return static |
||||||
| 232 | */ |
||||||
| 233 | public function enableLogQueries(bool $flag = true) |
||||||
| 234 | { |
||||||
| 235 | $this->getSettings()->set('log_queries', (int) $flag); |
||||||
| 236 | |||||||
| 237 | return $this; |
||||||
| 238 | } |
||||||
| 239 | |||||||
| 240 | /** |
||||||
| 241 | * Compress the result if the HTTP client said that it understands data compressed with gzip or deflate |
||||||
| 242 | */ |
||||||
| 243 | 63 | public function setHttpCompression(bool $enable) : self |
|||||
| 244 | { |
||||||
| 245 | 63 | $this->getSettings()->setHttpCompression($enable); |
|||||
| 246 | |||||||
| 247 | 63 | return $this; |
|||||
| 248 | } |
||||||
| 249 | |||||||
| 250 | /** |
||||||
| 251 | * Enable / Disable HTTPS |
||||||
| 252 | * |
||||||
| 253 | * @return static |
||||||
| 254 | */ |
||||||
| 255 | 1 | public function setHttps(bool $flag) |
|||||
| 256 | { |
||||||
| 257 | 1 | $this->transport->setHttps($flag); |
|||||
| 258 | |||||||
| 259 | 1 | return $this; |
|||||
| 260 | } |
||||||
| 261 | |||||||
| 262 | /** |
||||||
| 263 | * Read extremes of the result columns. They can be output in JSON-formats. |
||||||
| 264 | * |
||||||
| 265 | * @return static |
||||||
| 266 | */ |
||||||
| 267 | 2 | public function enableExtremes(bool $flag = true) |
|||||
| 268 | { |
||||||
| 269 | 2 | $this->getSettings()->set('extremes', (int) $flag); |
|||||
| 270 | |||||||
| 271 | 2 | return $this; |
|||||
| 272 | } |
||||||
| 273 | |||||||
| 274 | /** |
||||||
| 275 | * Ping server |
||||||
| 276 | * |
||||||
| 277 | * @return bool |
||||||
| 278 | */ |
||||||
| 279 | 39 | public function ping() |
|||||
| 280 | { |
||||||
| 281 | 39 | return $this->transport->ping(); |
|||||
| 282 | } |
||||||
| 283 | |||||||
| 284 | /** |
||||||
| 285 | * @param mixed[] $bindings |
||||||
| 286 | * @return Statement |
||||||
| 287 | */ |
||||||
| 288 | 29 | public function select( |
|||||
| 289 | string $sql, |
||||||
| 290 | array $bindings = [], |
||||||
| 291 | WhereInFile $whereInFile = null, |
||||||
| 292 | WriteToFile $writeToFile = null |
||||||
| 293 | ) { |
||||||
| 294 | 29 | return $this->transport->select($sql, $bindings, $whereInFile, $writeToFile); |
|||||
| 295 | } |
||||||
| 296 | |||||||
| 297 | /** |
||||||
| 298 | * prepare select |
||||||
| 299 | * |
||||||
| 300 | * @param mixed[] $bindings |
||||||
| 301 | * @return Statement |
||||||
| 302 | */ |
||||||
| 303 | 5 | public function selectAsync( |
|||||
| 304 | string $sql, |
||||||
| 305 | array $bindings = [], |
||||||
| 306 | WhereInFile $whereInFile = null, |
||||||
| 307 | WriteToFile $writeToFile = null |
||||||
| 308 | ) { |
||||||
| 309 | 5 | return $this->transport->selectAsync($sql, $bindings, $whereInFile, $writeToFile); |
|||||
| 310 | } |
||||||
| 311 | |||||||
| 312 | /** |
||||||
| 313 | * @return bool |
||||||
| 314 | */ |
||||||
| 315 | 10 | public function executeAsync() |
|||||
| 316 | { |
||||||
| 317 | 10 | return $this->transport->executeAsync(); |
|||||
| 318 | } |
||||||
| 319 | |||||||
| 320 | /** |
||||||
| 321 | * set progressFunction |
||||||
| 322 | */ |
||||||
| 323 | 1 | public function progressFunction(callable $callback) |
|||||
| 324 | { |
||||||
| 325 | 1 | if (! is_callable($callback)) { |
|||||
| 326 | throw new \InvalidArgumentException('Not is_callable progressFunction'); |
||||||
| 327 | } |
||||||
| 328 | |||||||
| 329 | 1 | if (! $this->getSettings()->isSet('send_progress_in_http_headers')) { |
|||||
| 330 | 1 | $this->getSettings()->set('send_progress_in_http_headers', 1); |
|||||
| 331 | } |
||||||
| 332 | 1 | if (! $this->getSettings()->isSet('http_headers_progress_interval_ms')) { |
|||||
| 333 | 1 | $this->getSettings()->set('http_headers_progress_interval_ms', 100); |
|||||
| 334 | } |
||||||
| 335 | |||||||
| 336 | 1 | $this->transport->setProgressFunction($callback); |
|||||
| 337 | 1 | } |
|||||
| 338 | |||||||
| 339 | /** |
||||||
| 340 | * SHOW PROCESSLIST |
||||||
| 341 | * |
||||||
| 342 | * @return array |
||||||
| 343 | */ |
||||||
| 344 | public function showProcesslist() |
||||||
| 345 | { |
||||||
| 346 | return $this->select('SHOW PROCESSLIST')->rows(); |
||||||
| 347 | } |
||||||
| 348 | |||||||
| 349 | /** |
||||||
| 350 | * Get the number of simultaneous/Pending requests |
||||||
| 351 | * |
||||||
| 352 | * @return int |
||||||
| 353 | */ |
||||||
| 354 | 12 | public function getCountPendingQueue() |
|||||
| 355 | { |
||||||
| 356 | 12 | return $this->transport->getCountPendingQueue(); |
|||||
| 357 | } |
||||||
| 358 | |||||||
| 359 | /** |
||||||
| 360 | * @param mixed[][] $values |
||||||
| 361 | * @param string[] $columns |
||||||
| 362 | */ |
||||||
| 363 | 9 | public function insert(string $table, array $values, array $columns = []) : Statement |
|||||
| 364 | { |
||||||
| 365 | 9 | if (empty($values)) { |
|||||
| 366 | 1 | throw QueryException::cannotInsertEmptyValues(); |
|||||
| 367 | } |
||||||
| 368 | |||||||
| 369 | 8 | if (stripos($table, '`') === false && stripos($table, '.') === false) { |
|||||
| 370 | 5 | $table = '`' . $table . '`'; //quote table name for dot names |
|||||
| 371 | } |
||||||
| 372 | 8 | $sql = 'INSERT INTO ' . $table; |
|||||
| 373 | |||||||
| 374 | 8 | if (count($columns) !== 0) { |
|||||
| 375 | 7 | $sql .= ' (`' . implode('`,`', $columns) . '`) '; |
|||||
| 376 | } |
||||||
| 377 | |||||||
| 378 | 8 | $sql .= ' VALUES '; |
|||||
| 379 | |||||||
| 380 | 8 | foreach ($values as $row) { |
|||||
| 381 | 8 | $sql .= ' (' . FormatLine::Insert($row) . '), '; |
|||||
| 382 | } |
||||||
| 383 | 8 | $sql = trim($sql, ', '); |
|||||
| 384 | |||||||
| 385 | 8 | return $this->transport->write($sql); |
|||||
| 386 | } |
||||||
| 387 | |||||||
| 388 | /** |
||||||
| 389 | * * Prepares the values to insert from the associative array. |
||||||
| 390 | * * There may be one or more lines inserted, but then the keys inside the array list must match (including in the sequence) |
||||||
| 391 | * * |
||||||
| 392 | * * @param mixed[] $values - array column_name => value (if we insert one row) or array list column_name => value if we insert many lines |
||||||
| 393 | * * @return mixed[][] - list of arrays - 0 => fields, 1 => list of value arrays for insertion |
||||||
| 394 | * */ |
||||||
| 395 | 3 | public function prepareInsertAssocBulk(array $values) |
|||||
| 396 | { |
||||||
| 397 | 3 | if (isset($values[0]) && is_array($values[0])) { //случай, когда много строк вставляется |
|||||
| 398 | 2 | $preparedFields = array_keys($values[0]); |
|||||
| 399 | 2 | $preparedValues = []; |
|||||
| 400 | 2 | foreach ($values as $idx => $row) { |
|||||
| 401 | 2 | $_fields = array_keys($row); |
|||||
| 402 | 2 | if ($_fields !== $preparedFields) { |
|||||
| 403 | 1 | throw new QueryException( |
|||||
| 404 | 1 | sprintf( |
|||||
| 405 | 1 | 'Fields not match: %s and %s on element %s', |
|||||
| 406 | 1 | implode(',', $_fields), |
|||||
| 407 | 1 | implode(',', $preparedFields), |
|||||
| 408 | 1 | $idx |
|||||
| 409 | ) |
||||||
| 410 | ); |
||||||
| 411 | } |
||||||
| 412 | 2 | $preparedValues[] = array_values($row); |
|||||
| 413 | } |
||||||
| 414 | } else { |
||||||
| 415 | 1 | $preparedFields = array_keys($values); |
|||||
| 416 | 1 | $preparedValues = [array_values($values)]; |
|||||
| 417 | } |
||||||
| 418 | |||||||
| 419 | 2 | return [$preparedFields, $preparedValues]; |
|||||
| 420 | } |
||||||
| 421 | |||||||
| 422 | /** |
||||||
| 423 | * Inserts one or more rows from an associative array. |
||||||
| 424 | * If there is a discrepancy between the keys of the value arrays (or their order) - throws an exception. |
||||||
| 425 | * |
||||||
| 426 | * @param mixed[] $values - array column_name => value (if we insert one row) or array list column_name => value if we insert many lines |
||||||
| 427 | * @return Statement |
||||||
| 428 | */ |
||||||
| 429 | public function insertAssocBulk(string $tableName, array $values) |
||||||
| 430 | { |
||||||
| 431 | list($columns, $vals) = $this->prepareInsertAssocBulk($values); |
||||||
| 432 | |||||||
| 433 | return $this->insert($tableName, $vals, $columns); |
||||||
| 434 | } |
||||||
| 435 | |||||||
| 436 | /** |
||||||
| 437 | * insert TabSeparated files |
||||||
| 438 | * |
||||||
| 439 | * @param string|string[] $fileNames |
||||||
| 440 | * @param string[] $columns |
||||||
| 441 | * @return mixed |
||||||
| 442 | */ |
||||||
| 443 | 1 | public function insertBatchTSVFiles(string $tableName, $fileNames, array $columns = []) |
|||||
| 444 | { |
||||||
| 445 | 1 | return $this->insertBatchFiles($tableName, $fileNames, $columns, 'TabSeparated'); |
|||||
| 446 | } |
||||||
| 447 | |||||||
| 448 | /** |
||||||
| 449 | * insert Batch Files |
||||||
| 450 | * |
||||||
| 451 | * @param string|string[] $fileNames |
||||||
| 452 | * @param string[] $columns |
||||||
| 453 | * @param string $format ['TabSeparated','TabSeparatedWithNames','CSV','CSVWithNames'] |
||||||
| 454 | * @return Statement[] |
||||||
| 455 | * @throws Exception\TransportException |
||||||
| 456 | */ |
||||||
| 457 | 8 | public function insertBatchFiles(string $tableName, $fileNames, array $columns = [], string $format = 'CSV') |
|||||
| 458 | { |
||||||
| 459 | 8 | if (is_string($fileNames)) { |
|||||
| 460 | $fileNames = [$fileNames]; |
||||||
| 461 | } |
||||||
| 462 | 8 | if ($this->getCountPendingQueue() > 0) { |
|||||
| 463 | throw new QueryException('Queue must be empty, before insertBatch, need executeAsync'); |
||||||
| 464 | } |
||||||
| 465 | |||||||
| 466 | 8 | if (! in_array($format, self::SUPPORTED_FORMATS, true)) { |
|||||
| 467 | throw new QueryException('Format not support in insertBatchFiles'); |
||||||
| 468 | } |
||||||
| 469 | |||||||
| 470 | 8 | $result = []; |
|||||
| 471 | |||||||
| 472 | 8 | foreach ($fileNames as $fileName) { |
|||||
| 473 | 8 | if (! is_file($fileName) || ! is_readable($fileName)) { |
|||||
| 474 | throw new QueryException('Cant read file: ' . $fileName . ' ' . (is_file($fileName) ? '' : ' is not file')); |
||||||
| 475 | } |
||||||
| 476 | |||||||
| 477 | 8 | if (empty($columns)) { |
|||||
| 478 | $sql = 'INSERT INTO ' . $tableName . ' FORMAT ' . $format; |
||||||
| 479 | } else { |
||||||
| 480 | 8 | $sql = 'INSERT INTO ' . $tableName . ' ( ' . implode(',', $columns) . ' ) FORMAT ' . $format; |
|||||
| 481 | } |
||||||
| 482 | 8 | $result[$fileName] = $this->transport->writeAsyncCSV($sql, $fileName); |
|||||
| 483 | } |
||||||
| 484 | |||||||
| 485 | // exec |
||||||
| 486 | 8 | $this->executeAsync(); |
|||||
| 487 | |||||||
| 488 | // fetch resutl |
||||||
| 489 | 8 | foreach ($fileNames as $fileName) { |
|||||
| 490 | 8 | if (! $result[$fileName]->isError()) { |
|||||
| 491 | 6 | continue; |
|||||
| 492 | } |
||||||
| 493 | |||||||
| 494 | 2 | $result[$fileName]->error(); |
|||||
| 495 | } |
||||||
| 496 | |||||||
| 497 | 6 | return $result; |
|||||
| 498 | } |
||||||
| 499 | |||||||
| 500 | /** |
||||||
| 501 | * insert Batch Stream |
||||||
| 502 | * |
||||||
| 503 | * @param string[] $columns |
||||||
| 504 | * @param string $format ['TabSeparated','TabSeparatedWithNames','CSV','CSVWithNames'] |
||||||
| 505 | * @return Transport\CurlerRequest |
||||||
| 506 | */ |
||||||
| 507 | 2 | public function insertBatchStream(string $tableName, array $columns = [], string $format = 'CSV') |
|||||
| 508 | { |
||||||
| 509 | 2 | if ($this->getCountPendingQueue() > 0) { |
|||||
| 510 | throw new QueryException('Queue must be empty, before insertBatch, need executeAsync'); |
||||||
| 511 | } |
||||||
| 512 | |||||||
| 513 | 2 | if (! in_array($format, self::SUPPORTED_FORMATS, true)) { |
|||||
| 514 | throw new QueryException('Format not support in insertBatchFiles'); |
||||||
| 515 | } |
||||||
| 516 | |||||||
| 517 | 2 | if (empty($columns)) { |
|||||
| 518 | $sql = 'INSERT INTO ' . $tableName . ' FORMAT ' . $format; |
||||||
| 519 | } else { |
||||||
| 520 | 2 | $sql = 'INSERT INTO ' . $tableName . ' ( ' . implode(',', $columns) . ' ) FORMAT ' . $format; |
|||||
| 521 | } |
||||||
| 522 | |||||||
| 523 | 2 | return $this->transport->writeStreamData($sql); |
|||||
| 524 | } |
||||||
| 525 | |||||||
| 526 | /** |
||||||
| 527 | * stream Write |
||||||
| 528 | * |
||||||
| 529 | * @param string[] $bind |
||||||
| 530 | * @return Statement |
||||||
| 531 | * @throws Exception\TransportException |
||||||
| 532 | */ |
||||||
| 533 | 1 | public function streamWrite(Stream $stream, string $sql, array $bind = []) |
|||||
| 534 | { |
||||||
| 535 | 1 | if ($this->getCountPendingQueue() > 0) { |
|||||
| 536 | throw new QueryException('Queue must be empty, before streamWrite'); |
||||||
| 537 | } |
||||||
| 538 | |||||||
| 539 | 1 | return $this->transport->streamWrite($stream, $sql, $bind); |
|||||
| 540 | } |
||||||
| 541 | |||||||
| 542 | /** |
||||||
| 543 | * stream Read |
||||||
| 544 | * |
||||||
| 545 | * @param string[] $bind |
||||||
| 546 | * @return Statement |
||||||
| 547 | */ |
||||||
| 548 | 1 | public function streamRead(Stream $streamRead, string $sql, array $bind = []) |
|||||
| 549 | { |
||||||
| 550 | 1 | if ($this->getCountPendingQueue() > 0) { |
|||||
| 551 | throw new QueryException('Queue must be empty, before streamWrite'); |
||||||
| 552 | } |
||||||
| 553 | |||||||
| 554 | 1 | return $this->transport->streamRead($streamRead, $sql, $bind); |
|||||
| 555 | } |
||||||
| 556 | |||||||
| 557 | /** |
||||||
| 558 | * show databases |
||||||
| 559 | * |
||||||
| 560 | * @return array |
||||||
| 561 | */ |
||||||
| 562 | public function showDatabases() |
||||||
| 563 | { |
||||||
| 564 | return $this->select('show databases')->rows(); |
||||||
| 565 | } |
||||||
| 566 | |||||||
| 567 | /** |
||||||
| 568 | * Size of database |
||||||
| 569 | * |
||||||
| 570 | * @return mixed|null |
||||||
| 571 | */ |
||||||
| 572 | public function databaseSize() |
||||||
| 573 | { |
||||||
| 574 | $database = $this->transport->getDatabase(); |
||||||
|
0 ignored issues
–
show
The method
getDatabase() does not exist on ClickHouseDB\Transport\Http.
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces. This is most likely a typographical error or the method has been renamed. Loading history...
|
|||||||
| 575 | |||||||
| 576 | return $this->select( |
||||||
| 577 | ' |
||||||
| 578 | SELECT database,formatReadableSize(sum(bytes)) as size |
||||||
| 579 | FROM system.parts |
||||||
| 580 | WHERE active AND database=:database |
||||||
| 581 | GROUP BY database |
||||||
| 582 | ', |
||||||
| 583 | ['database' => $database] |
||||||
| 584 | )->fetchOne(); |
||||||
| 585 | } |
||||||
| 586 | |||||||
| 587 | /** |
||||||
| 588 | * SHOW TABLES |
||||||
| 589 | * |
||||||
| 590 | * @return mixed[] |
||||||
| 591 | */ |
||||||
| 592 | 1 | public function showTables() |
|||||
| 593 | { |
||||||
| 594 | 1 | return $this->select('SHOW TABLES')->rowsAsTree('name'); |
|||||
| 595 | } |
||||||
| 596 | |||||||
| 597 | /** |
||||||
| 598 | * statement = SHOW CREATE TABLE |
||||||
| 599 | * |
||||||
| 600 | * @return mixed |
||||||
| 601 | */ |
||||||
| 602 | public function showCreateTable(string $table) |
||||||
| 603 | { |
||||||
| 604 | return $this->select('SHOW CREATE TABLE ' . $table)->fetchOne('statement'); |
||||||
| 605 | } |
||||||
| 606 | |||||||
| 607 | /** |
||||||
| 608 | * Size of tables |
||||||
| 609 | * |
||||||
| 610 | * @return mixed |
||||||
| 611 | */ |
||||||
| 612 | 1 | public function tableSize(string $tableName) |
|||||
| 613 | { |
||||||
| 614 | 1 | $tables = $this->tablesSize(); |
|||||
| 615 | |||||||
| 616 | 1 | if (isset($tables[$tableName])) { |
|||||
| 617 | 1 | return $tables[$tableName]; |
|||||
| 618 | } |
||||||
| 619 | |||||||
| 620 | return null; |
||||||
| 621 | } |
||||||
| 622 | |||||||
| 623 | /** |
||||||
| 624 | * Tables sizes |
||||||
| 625 | * |
||||||
| 626 | * @param bool $flatList |
||||||
| 627 | * @return mixed[][] |
||||||
| 628 | */ |
||||||
| 629 | 1 | public function tablesSize($flatList = false) |
|||||
| 630 | { |
||||||
| 631 | 1 | $result = $this->select(' |
|||||
| 632 | SELECT name as table,database, |
||||||
| 633 | max(sizebytes) as sizebytes, |
||||||
| 634 | max(size) as size, |
||||||
| 635 | min(min_date) as min_date, |
||||||
| 636 | max(max_date) as max_date |
||||||
| 637 | FROM system.tables |
||||||
| 638 | ANY LEFT JOIN |
||||||
| 639 | ( |
||||||
| 640 | SELECT table,database, |
||||||
| 641 | formatReadableSize(sum(bytes)) as size, |
||||||
| 642 | sum(bytes) as sizebytes, |
||||||
| 643 | min(min_date) as min_date, |
||||||
| 644 | max(max_date) as max_date |
||||||
| 645 | FROM system.parts |
||||||
| 646 | WHERE active AND database=:database |
||||||
| 647 | GROUP BY table,database |
||||||
| 648 | ) USING ( table,database ) |
||||||
| 649 | WHERE database=:database |
||||||
| 650 | GROUP BY table,database |
||||||
| 651 | ', |
||||||
| 652 | 1 | ['database' => $this->database]); |
|||||
| 653 | |||||||
| 654 | 1 | if ($flatList) { |
|||||
| 655 | return $result->rows(); |
||||||
| 656 | } |
||||||
| 657 | |||||||
| 658 | 1 | return $result->rowsAsTree('table'); |
|||||
| 659 | } |
||||||
| 660 | |||||||
| 661 | /** |
||||||
| 662 | * isExists |
||||||
| 663 | * |
||||||
| 664 | * @return array |
||||||
|
0 ignored issues
–
show
|
|||||||
| 665 | */ |
||||||
| 666 | public function tableExists(string $database, string $table) |
||||||
|
0 ignored issues
–
show
|
|||||||
| 667 | { |
||||||
| 668 | return $this->select( |
||||||
| 669 | ' |
||||||
| 670 | SELECT * |
||||||
| 671 | FROM system.tables |
||||||
| 672 | WHERE name=\'' . $table . '\' AND database=\'' . $database . '\'' |
||||||
| 673 | )->rowsAsTree('name'); |
||||||
| 674 | } |
||||||
| 675 | |||||||
| 676 | /** |
||||||
| 677 | * List of partitions |
||||||
| 678 | * |
||||||
| 679 | * @return mixed[][] |
||||||
| 680 | */ |
||||||
| 681 | public function partitions(string $table, int $limit = null, bool $active = null) |
||||||
| 682 | { |
||||||
| 683 | $database = $this->database; |
||||||
| 684 | $whereActiveClause = $active === null ? '' : sprintf(' AND active = %s', (int) $active); |
||||||
| 685 | $limitClause = $limit !== null ? ' LIMIT ' . $limit : ''; |
||||||
| 686 | |||||||
| 687 | return $this->select(<<<CLICKHOUSE |
||||||
| 688 | SELECT * |
||||||
| 689 | FROM system.parts |
||||||
| 690 | WHERE like(table,'%$table%') AND database='$database'$whereActiveClause |
||||||
| 691 | ORDER BY max_date $limitClause |
||||||
| 692 | CLICKHOUSE |
||||||
| 693 | )->rowsAsTree('name'); |
||||||
| 694 | } |
||||||
| 695 | |||||||
| 696 | /** |
||||||
| 697 | * dropPartition |
||||||
| 698 | * @deprecated |
||||||
| 699 | * @return Statement |
||||||
| 700 | */ |
||||||
| 701 | public function dropPartition(string $dataBaseTableName, string $partition_id) |
||||||
| 702 | { |
||||||
| 703 | |||||||
| 704 | $partition_id = trim($partition_id, '\''); |
||||||
| 705 | $this->getSettings()->set('replication_alter_partitions_sync', 2); |
||||||
| 706 | $state = $this->write('ALTER TABLE {dataBaseTableName} DROP PARTITION :partion_id', |
||||||
| 707 | [ |
||||||
| 708 | 'dataBaseTableName' => $dataBaseTableName, |
||||||
| 709 | 'partion_id' => $partition_id, |
||||||
| 710 | ]); |
||||||
| 711 | |||||||
| 712 | return $state; |
||||||
| 713 | } |
||||||
| 714 | |||||||
| 715 | /** |
||||||
| 716 | * dropOldPartitions by day_ago |
||||||
| 717 | * @deprecated |
||||||
| 718 | * |
||||||
| 719 | * @return array |
||||||
| 720 | * @throws Exception\TransportException |
||||||
| 721 | * @throws \Exception |
||||||
| 722 | */ |
||||||
| 723 | public function dropOldPartitions(string $table_name, int $days_ago, int $count_partitons_per_one = 100) |
||||||
| 724 | { |
||||||
| 725 | $days_ago = strtotime(date('Y-m-d 00:00:00', strtotime('-' . $days_ago . ' day'))); |
||||||
| 726 | |||||||
| 727 | $drop = []; |
||||||
| 728 | $list_patitions = $this->partitions($table_name, $count_partitons_per_one); |
||||||
| 729 | |||||||
| 730 | foreach ($list_patitions as $partion_id => $partition) { |
||||||
| 731 | if (stripos($partition['engine'], 'mergetree') === false) { |
||||||
| 732 | continue; |
||||||
| 733 | } |
||||||
| 734 | |||||||
| 735 | // $min_date = strtotime($partition['min_date']); |
||||||
| 736 | $max_date = strtotime($partition['max_date']); |
||||||
| 737 | |||||||
| 738 | if ($max_date < $days_ago) { |
||||||
| 739 | $drop[] = $partition['partition']; |
||||||
| 740 | } |
||||||
| 741 | } |
||||||
| 742 | |||||||
| 743 | $result = []; |
||||||
| 744 | foreach ($drop as $partition_id) { |
||||||
| 745 | $result[$partition_id] = $this->dropPartition($table_name, $partition_id); |
||||||
| 746 | } |
||||||
| 747 | |||||||
| 748 | return $result; |
||||||
| 749 | } |
||||||
| 750 | |||||||
| 751 | /** |
||||||
| 752 | * Truncate ( drop all partitions ) |
||||||
| 753 | * @deprecated |
||||||
| 754 | * @return array |
||||||
| 755 | */ |
||||||
| 756 | public function truncateTable(string $tableName) |
||||||
| 757 | { |
||||||
| 758 | $partions = $this->partitions($tableName); |
||||||
| 759 | $out = []; |
||||||
| 760 | foreach ($partions as $part_key => $part) { |
||||||
| 761 | $part_id = $part['partition']; |
||||||
| 762 | $out[$part_id] = $this->dropPartition($tableName, $part_id); |
||||||
| 763 | } |
||||||
| 764 | |||||||
| 765 | return $out; |
||||||
| 766 | } |
||||||
| 767 | |||||||
| 768 | /** |
||||||
| 769 | * Returns the server's uptime in seconds. |
||||||
| 770 | * |
||||||
| 771 | * @return int |
||||||
| 772 | * @throws Exception\TransportException |
||||||
| 773 | */ |
||||||
| 774 | 1 | public function getServerUptime() |
|||||
| 775 | { |
||||||
| 776 | 1 | return $this->select('SELECT uptime() as uptime')->fetchOne('uptime'); |
|||||
| 777 | } |
||||||
| 778 | |||||||
| 779 | /** |
||||||
| 780 | * Returns string with the server version. |
||||||
| 781 | */ |
||||||
| 782 | 1 | public function getServerVersion() : string |
|||||
| 783 | { |
||||||
| 784 | 1 | return (string) $this->select('SELECT version() as version')->fetchOne('version'); |
|||||
| 785 | } |
||||||
| 786 | |||||||
| 787 | /** |
||||||
| 788 | * Read system.settings table |
||||||
| 789 | * |
||||||
| 790 | * @return mixed[][] |
||||||
| 791 | */ |
||||||
| 792 | 1 | public function getServerSystemSettings(string $like = '') |
|||||
| 793 | { |
||||||
| 794 | 1 | $l = []; |
|||||
| 795 | 1 | $list = $this->select('SELECT * FROM system.settings' . ($like ? ' WHERE name LIKE :like' : ''), |
|||||
| 796 | 1 | ['like' => '%' . $like . '%'])->rows(); |
|||||
| 797 | 1 | foreach ($list as $row) { |
|||||
| 798 | 1 | if (isset($row['name'])) { |
|||||
| 799 | 1 | $n = $row['name']; |
|||||
| 800 | 1 | unset($row['name']); |
|||||
| 801 | 1 | $l[$n] = $row; |
|||||
| 802 | } |
||||||
| 803 | } |
||||||
| 804 | |||||||
| 805 | 1 | return $l; |
|||||
| 806 | } |
||||||
| 807 | } |
||||||
| 808 |