Failed Conditions
Push — master ( 20fa9e...5a0a45 )
by Igor
03:28 queued 10s
created

src/Client.php (8 issues)

Severity
1
<?php
2
3
declare(strict_types=1);
4
5
namespace ClickHouseDB;
6
7
use ClickHouseDB\Exception\QueryException;
8
use ClickHouseDB\Query\Degeneration;
9
use ClickHouseDB\Query\Degeneration\Bindings;
10
use ClickHouseDB\Query\Degeneration\Conditions;
11
use ClickHouseDB\Query\WhereInFile;
12
use ClickHouseDB\Query\WriteToFile;
13
use ClickHouseDB\Quote\FormatLine;
14
use ClickHouseDB\Transport\Http;
15
use ClickHouseDB\Transport\Stream;
16
use function array_flip;
17
use function array_keys;
18
use function array_rand;
19
use function array_values;
20
use function count;
21
use function date;
22
use function implode;
23
use function in_array;
24
use function is_array;
25
use function is_callable;
26
use function is_file;
27
use function is_readable;
28
use function is_string;
29
use function sprintf;
30
use function stripos;
31
use function strtotime;
32
use function trim;
33
34
/**
35
 * Class Client
36
 * @package ClickHouseDB
37
 */
38
class Client
39
{
40
    const SUPPORTED_FORMATS = ['TabSeparated', 'TabSeparatedWithNames', 'CSV', 'CSVWithNames', 'JSONEachRow'];
41
42
    /** @var Http */
43
    private $transport;
44
45
    /** @var string */
46
    private $connectUsername;
47
48
    /** @var string */
49
    private $connectPassword;
50
51
    /** @var string */
52
    private $connectHost;
53
54
    /** @var string */
55
    private $connectPort;
56
57
    /** @var bool */
58
    private $connectUserReadonly = false;
59
60
    /**
61
     * @param mixed[] $connectParams
62
     * @param mixed[] $settings
63
     */
64 67
    public function __construct(array $connectParams, array $settings = [])
65
    {
66 67
        if (! isset($connectParams['username'])) {
67
            throw  new \InvalidArgumentException('not set username');
68
        }
69
70 67
        if (! isset($connectParams['password'])) {
71
            throw  new \InvalidArgumentException('not set password');
72
        }
73
74 67
        if (! isset($connectParams['port'])) {
75
            throw  new \InvalidArgumentException('not set port');
76
        }
77
78 67
        if (! isset($connectParams['host'])) {
79
            throw  new \InvalidArgumentException('not set host');
80
        }
81
82 67
        $this->connectUsername = $connectParams['username'];
83 67
        $this->connectPassword = $connectParams['password'];
84 67
        $this->connectPort     = $connectParams['port'];
85 67
        $this->connectHost     = $connectParams['host'];
86
87
        // init transport class
88 67
        $this->transport = new Http(
89 67
            $this->connectHost,
90 67
            $this->connectPort,
91 67
            $this->connectUsername,
92 67
            $this->connectPassword
93
        );
94
95 67
        $this->transport->addQueryDegeneration(new Bindings());
96
97
        // apply settings to transport class
98 67
        $this->settings()->database('default');
99 67
        if (! empty($settings)) {
100 1
            $this->settings()->apply($settings);
101
        }
102
103 67
        if (isset($connectParams['readonly'])) {
104
            $this->setReadOnlyUser($connectParams['readonly']);
105
        }
106
107 67
        if (isset($connectParams['https'])) {
108
            $this->https($connectParams['https']);
109
        }
110
111 67
        $this->enableHttpCompression();
112 67
    }
113
114
    /**
115
     * if the user has only read in the config file
116
     */
117
    public function setReadOnlyUser(bool $flag)
118
    {
119
        $this->connectUserReadonly = $flag;
120
        $this->settings()->setReadOnlyUser($this->connectUserReadonly);
121
    }
122
123
    /**
124
     * Clear Degeneration processing request [template ]
125
     *
126
     * @return bool
127
     */
128 1
    public function cleanQueryDegeneration()
129
    {
130 1
        return $this->transport->cleanQueryDegeneration();
131
    }
132
133
    /**
134
     * add Degeneration processing
135
     *
136
     * @return bool
137
     */
138
    public function addQueryDegeneration(Degeneration $degeneration)
139
    {
140
        return $this->transport->addQueryDegeneration($degeneration);
141
    }
142
143
    /**
144
     * add Conditions in query
145
     *
146
     * @return bool
147
     */
148 3
    public function enableQueryConditions()
149
    {
150 3
        return $this->transport->addQueryDegeneration(new Conditions());
151
    }
152
153
    /**
154
     * Set connection host
155
     *
156
     * @param string $host
157
     */
158
    public function setHost($host)
159
    {
160
        $this->connectHost = $host;
161
        $this->transport()->setHost($host);
162
    }
163
164
    /**
165
     * @return Settings
166
     */
167 2
    public function setTimeout(float $timeout)
168
    {
169 2
        return $this->settings()->max_execution_time($timeout);
170
    }
171
172
    /**
173
     * @return mixed
174
     */
175 1
    public function getTimeout()
176
    {
177 1
        return $this->settings()->getTimeOut();
178
    }
179
180
    /**
181
     * ConnectTimeOut in seconds ( support 1.5 = 1500ms )
182
     */
183 2
    public function setConnectTimeOut(int $connectTimeOut)
184
    {
185 2
        $this->transport()->setConnectTimeOut($connectTimeOut);
186 2
    }
187
188
    /**
189
     * @return int
190
     */
191 1
    public function getConnectTimeOut()
192
    {
193 1
        return $this->transport()->getConnectTimeOut();
194
    }
195
196
    /**
197
     * @return Http
198
     */
199 67
    public function transport()
200
    {
201 67
        if (! $this->transport) {
202
            throw  new \InvalidArgumentException('Empty transport class');
203
        }
204
205 67
        return $this->transport;
206
    }
207
208
    /**
209
     * @return string
210
     */
211
    public function getConnectHost()
212
    {
213
        return $this->connectHost;
214
    }
215
216
    /**
217
     * @return string
218
     */
219
    public function getConnectPassword()
220
    {
221
        return $this->connectPassword;
222
    }
223
224
    /**
225
     * @return string
226
     */
227
    public function getConnectPort()
228
    {
229
        return $this->connectPort;
230
    }
231
232
    /**
233
     * @return string
234
     */
235
    public function getConnectUsername()
236
    {
237
        return $this->connectUsername;
238
    }
239
240
    /**
241
     * @return Http
242
     */
243
    public function getTransport()
244
    {
245
        return $this->transport;
246
    }
247
248
    /**
249
     * @return mixed
250
     */
251
    public function verbose()
252
    {
253
        return $this->transport()->verbose(true);
254
    }
255
256
    /**
257
     * @return Settings
258
     */
259 67
    public function settings()
260
    {
261 67
        return $this->transport()->settings();
262
    }
263
264
    /**
265
     * @param string|null $useSessionId
266
     * @return $this
267
     */
268 2
    public function useSession(string $useSessionId = null)
269
    {
270 2
        if (! $this->settings()->getSessionId()) {
271 2
            if (! $useSessionId) {
272 2
                $this->settings()->makeSessionId();
273
            } else {
274
                $this->settings()->session_id($useSessionId);
275
            }
276
        }
277 2
        return $this;
278
    }
279
280
    /**
281
     * @return mixed
282
     */
283 2
    public function getSession()
284
    {
285 2
        return $this->settings()->getSessionId();
286
    }
287
288
    /**
289
     * Query CREATE/DROP
290
     *
291
     * @param mixed[] $bindings
292
     * @return Statement
293
     */
294 26
    public function write(string $sql, array $bindings = [], bool $exception = true)
295
    {
296 26
        return $this->transport()->write($sql, $bindings, $exception);
297
    }
298
299
    /**
300
     * set db name
301
     * @return static
302
     */
303 67
    public function database(string $db)
304
    {
305 67
        $this->settings()->database($db);
306
307 67
        return $this;
308
    }
309
310
    /**
311
     * Write to system.query_log
312
     *
313
     * @return static
314
     */
315
    public function enableLogQueries(bool $flag = true)
316
    {
317
        $this->settings()->set('log_queries', (int) $flag);
318
319
        return $this;
320
    }
321
322
    /**
323
     * Compress the result if the HTTP client said that it understands data compressed with gzip or deflate
324
     *
325
     * @return static
326
     */
327 67
    public function enableHttpCompression(bool $flag = true)
328
    {
329 67
        $this->settings()->enableHttpCompression($flag);
330
331 67
        return $this;
332
    }
333
334
    /**
335
     * Enable / Disable HTTPS
336
     *
337
     * @return static
338
     */
339 1
    public function https(bool $flag = true)
340
    {
341 1
        $this->settings()->https($flag);
342
343 1
        return $this;
344
    }
345
346
    /**
347
     * Read extremes of the result columns. They can be output in JSON-formats.
348
     *
349
     * @return static
350
     */
351 2
    public function enableExtremes(bool $flag = true)
352
    {
353 2
        $this->settings()->set('extremes', (int) $flag);
354
355 2
        return $this;
356
    }
357
358
    /**
359
     * @param mixed[] $bindings
360
     * @return Statement
361
     */
362 31
    public function select(
363
        string $sql,
364
        array $bindings = [],
365
        WhereInFile $whereInFile = null,
366
        WriteToFile $writeToFile = null
367
    ) {
368 31
        return $this->transport()->select($sql, $bindings, $whereInFile, $writeToFile);
369
    }
370
371
    /**
372
     * @return bool
373
     */
374 10
    public function executeAsync()
375
    {
376 10
        return $this->transport()->executeAsync();
377
    }
378
379
    public function maxTimeExecutionAllAsync()
380
    {
381
382
    }
383
384
    /**
385
     * set progressFunction
386
     */
387 1
    public function progressFunction(callable $callback)
388
    {
389 1
        if (! is_callable($callback)) {
390
            throw new \InvalidArgumentException('Not is_callable progressFunction');
391
        }
392
393 1
        if (! $this->settings()->is('send_progress_in_http_headers')) {
394 1
            $this->settings()->set('send_progress_in_http_headers', 1);
395
        }
396 1
        if (! $this->settings()->is('http_headers_progress_interval_ms')) {
397 1
            $this->settings()->set('http_headers_progress_interval_ms', 100);
398
        }
399
400 1
        $this->transport()->setProgressFunction($callback);
401 1
    }
402
403
    /**
404
     * prepare select
405
     *
406
     * @param mixed[] $bindings
407
     * @return Statement
408
     */
409 7
    public function selectAsync(
410
        string $sql,
411
        array $bindings = [],
412
        WhereInFile $whereInFile = null,
413
        WriteToFile $writeToFile = null
414
    ) {
415 7
        return $this->transport()->selectAsync($sql, $bindings, $whereInFile, $writeToFile);
416
    }
417
418
    /**
419
     * SHOW PROCESSLIST
420
     *
421
     * @return array
422
     */
423
    public function showProcesslist()
424
    {
425
        return $this->select('SHOW PROCESSLIST')->rows();
426
    }
427
428
    /**
429
     * show databases
430
     *
431
     * @return array
432
     */
433
    public function showDatabases()
434
    {
435
        return $this->select('show databases')->rows();
436
    }
437
438
    /**
439
     * statement = SHOW CREATE TABLE
440
     *
441
     * @return mixed
442
     */
443
    public function showCreateTable(string $table)
444
    {
445
        return $this->select('SHOW CREATE TABLE ' . $table)->fetchOne('statement');
446
    }
447
448
    /**
449
     * SHOW TABLES
450
     *
451
     * @return mixed[]
452
     */
453 1
    public function showTables()
454
    {
455 1
        return $this->select('SHOW TABLES')->rowsAsTree('name');
456
    }
457
458
    /**
459
     * Get the number of simultaneous/Pending requests
460
     *
461
     * @return int
462
     */
463 12
    public function getCountPendingQueue()
464
    {
465 12
        return $this->transport()->getCountPendingQueue();
466
    }
467
468
    /**
469
     * @param mixed[][] $values
470
     * @param string[]  $columns
471
     * @return Statement
472
     * @throws Exception\TransportException
473
     */
474 9
    public function insert(string $table, array $values, array $columns = []) : Statement
475
    {
476 9
        if (empty($values)) {
477 1
            throw QueryException::cannotInsertEmptyValues();
478
        }
479
480 8
        if (stripos($table, '`') === false && stripos($table, '.') === false) {
481 5
            $table = '`' . $table . '`'; //quote table name for dot names
482
        }
483 8
        $sql = 'INSERT INTO ' . $table;
484
485 8
        if (count($columns) !== 0) {
486 7
            $sql .= ' (`' . implode('`,`', $columns) . '`) ';
487
        }
488
489 8
        $sql .= ' VALUES ';
490
491 8
        foreach ($values as $row) {
492 8
            $sql .= ' (' . FormatLine::Insert($row) . '), ';
493
        }
494 8
        $sql = trim($sql, ', ');
495
496 8
        return $this->transport()->write($sql);
497
    }
498
499
    /**
500
     *       * Prepares the values to insert from the associative array.
501
     *       * There may be one or more lines inserted, but then the keys inside the array list must match (including in the sequence)
502
     *       *
503
     *       * @param mixed[] $values - array column_name => value (if we insert one row) or array list column_name => value if we insert many lines
504
     *       * @return mixed[][] - list of arrays - 0 => fields, 1 => list of value arrays for insertion
505
     *       */
506 3
    public function prepareInsertAssocBulk(array $values)
507
    {
508 3
        if (isset($values[0]) && is_array($values[0])) { //случай, когда много строк вставляется
509 2
            $preparedFields = array_keys($values[0]);
510 2
            $preparedValues = [];
511 2
            foreach ($values as $idx => $row) {
512 2
                $_fields = array_keys($row);
513 2
                if ($_fields !== $preparedFields) {
514 1
                    throw new QueryException(
515 1
                        sprintf(
516 1
                            'Fields not match: %s and %s on element %s',
517 1
                            implode(',', $_fields),
518 1
                            implode(',', $preparedFields),
519 1
                            $idx
520
                        )
521
                    );
522
                }
523 2
                $preparedValues[] = array_values($row);
524
            }
525
        } else {
526 1
            $preparedFields = array_keys($values);
527 1
            $preparedValues = [array_values($values)];
528
        }
529
530 2
        return [$preparedFields, $preparedValues];
531
    }
532
533
    /**
534
     * Inserts one or more rows from an associative array.
535
     * If there is a discrepancy between the keys of the value arrays (or their order) - throws an exception.
536
     *
537
     * @param mixed[] $values - array column_name => value (if we insert one row) or array list column_name => value if we insert many lines
538
     * @return Statement
539
     */
540
    public function insertAssocBulk(string $tableName, array $values)
541
    {
542
        list($columns, $vals) = $this->prepareInsertAssocBulk($values);
543
544
        return $this->insert($tableName, $vals, $columns);
545
    }
546
547
    /**
548
     * insert TabSeparated files
549
     *
550
     * @param string|string[] $fileNames
551
     * @param string[]        $columns
552
     * @return mixed
553
     */
554 1
    public function insertBatchTSVFiles(string $tableName, $fileNames, array $columns = [])
555
    {
556 1
        return $this->insertBatchFiles($tableName, $fileNames, $columns, 'TabSeparated');
557
    }
558
559
    /**
560
     * insert Batch Files
561
     *
562
     * @param string|string[] $fileNames
563
     * @param string[]        $columns
564
     * @param string          $format ['TabSeparated','TabSeparatedWithNames','CSV','CSVWithNames']
565
     * @return Statement[]
566
     * @throws Exception\TransportException
567
     */
568 8
    public function insertBatchFiles(string $tableName, $fileNames, array $columns = [], string $format = 'CSV')
569
    {
570 8
        if (is_string($fileNames)) {
571
            $fileNames = [$fileNames];
572
        }
573 8
        if ($this->getCountPendingQueue() > 0) {
574
            throw new QueryException('Queue must be empty, before insertBatch, need executeAsync');
575
        }
576
577 8
        if (! in_array($format, self::SUPPORTED_FORMATS, true)) {
578
            throw new QueryException('Format not support in insertBatchFiles');
579
        }
580
581 8
        $result = [];
582
583 8
        foreach ($fileNames as $fileName) {
584 8
            if (! is_file($fileName) || ! is_readable($fileName)) {
585
                throw  new QueryException('Cant read file: ' . $fileName . ' ' . (is_file($fileName) ? '' : ' is not file'));
586
            }
587
588 8
            if (empty($columns)) {
589
                $sql = 'INSERT INTO ' . $tableName . ' FORMAT ' . $format;
590
            } else {
591 8
                $sql = 'INSERT INTO ' . $tableName . ' ( ' . implode(',', $columns) . ' ) FORMAT ' . $format;
592
            }
593 8
            $result[$fileName] = $this->transport()->writeAsyncCSV($sql, $fileName);
594
        }
595
596
        // exec
597 8
        $this->executeAsync();
598
599
        // fetch resutl
600 8
        foreach ($fileNames as $fileName) {
601 8
            if (! $result[$fileName]->isError()) {
602 6
                continue;
603
            }
604
605 2
            $result[$fileName]->error();
606
        }
607
608 6
        return $result;
609
    }
610
611
    /**
612
     * insert Batch Stream
613
     *
614
     * @param string[] $columns
615
     * @param string   $format ['TabSeparated','TabSeparatedWithNames','CSV','CSVWithNames']
616
     * @return Transport\CurlerRequest
617
     */
618 2
    public function insertBatchStream(string $tableName, array $columns = [], string $format = 'CSV')
619
    {
620 2
        if ($this->getCountPendingQueue() > 0) {
621
            throw new QueryException('Queue must be empty, before insertBatch, need executeAsync');
622
        }
623
624 2
        if (! in_array($format, self::SUPPORTED_FORMATS, true)) {
625
            throw new QueryException('Format not support in insertBatchFiles');
626
        }
627
628 2
        if (empty($columns)) {
629
            $sql = 'INSERT INTO ' . $tableName . ' FORMAT ' . $format;
630
        } else {
631 2
            $sql = 'INSERT INTO ' . $tableName . ' ( ' . implode(',', $columns) . ' ) FORMAT ' . $format;
632
        }
633
634 2
        return $this->transport()->writeStreamData($sql);
635
    }
636
637
    /**
638
     * stream Write
639
     *
640
     * @param string[] $bind
641
     * @return Statement
642
     * @throws Exception\TransportException
643
     */
644 1
    public function streamWrite(Stream $stream, string $sql, array $bind = [])
645
    {
646 1
        if ($this->getCountPendingQueue() > 0) {
647
            throw new QueryException('Queue must be empty, before streamWrite');
648
        }
649
650 1
        return $this->transport()->streamWrite($stream, $sql, $bind);
651
    }
652
653
    /**
654
     * stream Read
655
     *
656
     * @param string[] $bind
657
     * @return Statement
658
     */
659 1
    public function streamRead(Stream $streamRead, string $sql, array $bind = [])
660
    {
661 1
        if ($this->getCountPendingQueue() > 0) {
662
            throw new QueryException('Queue must be empty, before streamWrite');
663
        }
664
665 1
        return $this->transport()->streamRead($streamRead, $sql, $bind);
666
    }
667
668
    /**
669
     * Size of database
670
     *
671
     * @return mixed|null
672
     */
673
    public function databaseSize()
674
    {
675
        $b = $this->settings()->getDatabase();
676
677
        return $this->select(
678
            '
679
            SELECT database,formatReadableSize(sum(bytes)) as size
680
            FROM system.parts
681
            WHERE active AND database=:database
682
            GROUP BY database
683
            ',
684
            ['database' => $b]
685
        )->fetchOne();
686
    }
687
688
    /**
689
     * Size of tables
690
     *
691
     * @return mixed
692
     */
693 1
    public function tableSize(string $tableName)
694
    {
695 1
        $tables = $this->tablesSize();
696
697 1
        if (isset($tables[$tableName])) {
698 1
            return $tables[$tableName];
699
        }
700
701
        return null;
702
    }
703
704
    /**
705
     * Ping server
706
     *
707
     * @return bool
708
     */
709 37
    public function ping()
710
    {
711 37
        return $this->transport()->ping();
712
    }
713
714
    /**
715
     * Tables sizes
716
     *
717
     * @param bool $flatList
718
     * @return mixed[][]
719
     */
720 1
    public function tablesSize($flatList = false)
721
    {
722 1
        $result = $this->select('
723
        SELECT name as table,database,
724
            max(sizebytes) as sizebytes,
725
            max(size) as size,
726
            min(min_date) as min_date,
727
            max(max_date) as max_date
728
            FROM system.tables
729
            ANY LEFT JOIN 
730
            (
731
            SELECT table,database,
732
                        formatReadableSize(sum(bytes)) as size,
733
                        sum(bytes) as sizebytes,
734
                        min(min_date) as min_date,
735
                        max(max_date) as max_date
736
                        FROM system.parts 
737
                        WHERE active AND database=:database
738
                        GROUP BY table,database
739
            ) USING ( table,database )
740
            WHERE database=:database
741
            GROUP BY table,database
742
        ',
743 1
            ['database' => $this->settings()->getDatabase()]);
744
745 1
        if ($flatList) {
746
            return $result->rows();
747
        }
748
749 1
        return $result->rowsAsTree('table');
750
    }
751
752
    /**
753
     * isExists
754
     *
755
     * @return array
756
     */
757
    public function isExists(string $database, string $table)
758
    {
759
        return $this->select(
760
            '
761
            SELECT *
762
            FROM system.tables 
763
            WHERE name=\'' . $table . '\' AND database=\'' . $database . '\''
764
        )->rowsAsTree('name');
765
    }
766
767
    /**
768
     * List of partitions
769
     *
770
     * @return mixed[][]
771
     */
772
    public function partitions(string $table, int $limit = null, bool $active = null)
773
    {
774
        $database          = $this->settings()->getDatabase();
775
        $whereActiveClause = $active === null ? '' : sprintf(' AND active = %s', (int) $active);
776
        $limitClause       = $limit !== null ? ' LIMIT ' . $limit : '';
777
778
        return $this->select(<<<CLICKHOUSE
779
SELECT *
780
FROM system.parts 
781
WHERE like(table,'%$table%') AND database='$database'$whereActiveClause
782
ORDER BY max_date $limitClause
783
CLICKHOUSE
784
        )->rowsAsTree('name');
785
    }
786
787
    /**
788
     * dropPartition
789
     * @deprecated
0 ignored issues
show
Expected 1 lines between description and annotations, found 0.
Loading history...
Incorrect annotations group.
Loading history...
790
     * @return Statement
791
     */
792
    public function dropPartition(string $dataBaseTableName, string $partition_id)
793
    {
794
795
        $partition_id = trim($partition_id, '\'');
796
        $this->settings()->set('replication_alter_partitions_sync', 2);
797
        $state = $this->write('ALTER TABLE {dataBaseTableName} DROP PARTITION :partion_id',
798
            [
799
                'dataBaseTableName' => $dataBaseTableName,
800
                'partion_id'        => $partition_id,
801
            ]);
802
803
        return $state;
804
    }
805
806
    /**
807
     * Truncate ( drop all partitions )
808
     * @deprecated
0 ignored issues
show
Expected 1 lines between description and annotations, found 0.
Loading history...
Incorrect annotations group.
Loading history...
809
     * @return array
0 ignored issues
show
@return annotation of method \ClickHouseDB\Client::truncateTable() does not specify type hint for items of its traversable return value.
Loading history...
810
     */
811
    public function truncateTable(string $tableName)
812
    {
813
        $partions = $this->partitions($tableName);
814
        $out      = [];
815
        foreach ($partions as $part_key => $part) {
816
            $part_id       = $part['partition'];
817
            $out[$part_id] = $this->dropPartition($tableName, $part_id);
818
        }
819
820
        return $out;
821
    }
822
823
    /**
824
     * Returns the server's uptime in seconds.
825
     *
826
     * @return int
827
     * @throws Exception\TransportException
828
     */
829 1
    public function getServerUptime()
830
    {
831 1
        return $this->select('SELECT uptime() as uptime')->fetchOne('uptime');
832
    }
833
834
    /**
835
     * Returns string with the server version.
836
     */
837 1
    public function getServerVersion() : string
838
    {
839 1
        return (string) $this->select('SELECT version() as version')->fetchOne('version');
840
    }
841
842
    /**
843
     * Read system.settings table
844
     *
845
     * @return mixed[][]
846
     */
847 1
    public function getServerSystemSettings(string $like = '')
848
    {
849 1
        $l    = [];
850 1
        $list = $this->select('SELECT * FROM system.settings' . ($like ? ' WHERE name LIKE :like' : ''),
851 1
            ['like' => '%' . $like . '%'])->rows();
852 1
        foreach ($list as $row) {
853 1
            if (isset($row['name'])) {
854 1
                $n = $row['name'];
855 1
                unset($row['name']);
856 1
                $l[$n] = $row;
857
            }
858
        }
859
860 1
        return $l;
861
    }
862
863
    /**
864
     * dropOldPartitions by day_ago
865
     * @deprecated
0 ignored issues
show
Expected 1 lines between description and annotations, found 0.
Loading history...
866
     *
867
     * @return array
0 ignored issues
show
Incorrect annotations group.
Loading history...
@return annotation of method \ClickHouseDB\Client::dropOldPartitions() does not specify type hint for items of its traversable return value.
Loading history...
868
     * @throws Exception\TransportException
869
     * @throws \Exception
870
     */
871
    public function dropOldPartitions(string $table_name, int $days_ago, int $count_partitons_per_one = 100)
872
    {
873
        $days_ago = strtotime(date('Y-m-d 00:00:00', strtotime('-' . $days_ago . ' day')));
874
875
        $drop           = [];
876
        $list_patitions = $this->partitions($table_name, $count_partitons_per_one);
877
878
        foreach ($list_patitions as $partion_id => $partition) {
879
            if (stripos($partition['engine'], 'mergetree') === false) {
880
                continue;
881
            }
882
883
            // $min_date = strtotime($partition['min_date']);
884
            $max_date = strtotime($partition['max_date']);
885
886
            if ($max_date < $days_ago) {
887
                $drop[] = $partition['partition'];
888
            }
889
        }
890
891
        $result = [];
892
        foreach ($drop as $partition_id) {
893
            $result[$partition_id] = $this->dropPartition($table_name, $partition_id);
894
        }
895
896
        return $result;
897
    }
898
}
899