Passed
Branch master (8a55cf)
by Igor
12:47 queued 09:06
created

Client::dropOldPartitions()   A

Complexity

Conditions 5
Paths 8

Size

Total Lines 26

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 30

Importance

Changes 0
Metric Value
dl 0
loc 26
ccs 0
cts 15
cp 0
rs 9.1928
c 0
b 0
f 0
cc 5
nc 8
nop 3
crap 30
1
<?php
2
3
namespace ClickHouseDB;
4
5
use ClickHouseDB\Exception\QueryException;
6
use ClickHouseDB\Query\Degeneration\Bindings;
7
use ClickHouseDB\Query\WhereInFile;
8
use ClickHouseDB\Query\WriteToFile;
9
use ClickHouseDB\Quote\FormatLine;
10
use ClickHouseDB\Transport\Http;
11
12
class Client
13
{
14
    /**
15
     * @var Http
16
     */
17
    private $_transport = null;
18
19
    /**
20
     * @var string
21
     */
22
    private $_connect_username = '';
23
24
    /**
25
     * @var string
26
     */
27
    private $_connect_password = '';
28
29
    /**
30
     * @var string
31
     */
32
    private $_connect_host = '';
33
34
    /**
35
     * @var string
36
     */
37
    private $_connect_port = '';
38
39
    /**
40
     * @var bool
41
     */
42
    private $_connect_user_readonly = false;
43
    /**
44
     * @var array
45
     */
46
    private $_support_format = ['TabSeparated', 'TabSeparatedWithNames', 'CSV', 'CSVWithNames', 'JSONEachRow'];
47
48
    /**
49
     * Client constructor.
50
     * @param array $connect_params
51
     * @param array $settings
52
     */
53 44
    public function __construct($connect_params, $settings = [])
54
    {
55 44
        if (!isset($connect_params['username'])) {
56
            throw  new \InvalidArgumentException('not set username');
57
        }
58
59 44
        if (!isset($connect_params['password'])) {
60
            throw  new \InvalidArgumentException('not set password');
61
        }
62
63 44
        if (!isset($connect_params['port'])) {
64
            throw  new \InvalidArgumentException('not set port');
65
        }
66
67 44
        if (!isset($connect_params['host'])) {
68
            throw  new \InvalidArgumentException('not set host');
69
        }
70
71 44
        if (isset($connect_params['settings']) && is_array($connect_params['settings'])) {
72 2
            if (empty($settings)) {
73 2
                $settings = $connect_params['settings'];
74
            }
75
        }
76
77 44
        $this->_connect_username    = $connect_params['username'];
78 44
        $this->_connect_password    = $connect_params['password'];
79 44
        $this->_connect_port        = $connect_params['port'];
80 44
        $this->_connect_host        = $connect_params['host'];
81
82
83
        // init transport class
84 44
        $this->_transport = new Http(
85 44
            $this->_connect_host,
86 44
            $this->_connect_port,
87 44
            $this->_connect_username,
88 44
            $this->_connect_password
89
        );
90
91
92 44
        $this->_transport->addQueryDegeneration(new Bindings());
93
94
        // apply settings to transport class
95 44
        $this->settings()->database('default');
96 44
        if (sizeof($settings)) {
97 2
            $this->settings()->apply($settings);
98
        }
99
100
101 44
        if (isset($connect_params['readonly']))
102
        {
103
            $this->setReadOnlyUser($connect_params['readonly']);
104
        }
105
106 44
        if (isset($connect_params['https']))
107
        {
108
            $this->https($connect_params['https']);
109
        }
110
111
112
113
114 44
    }
115
116
    /**
117
     * if the user has only read in the config file
118
     *
119
     * @param bool $flag
120
     */
121
    public function setReadOnlyUser($flag)
122
    {
123
        $this->_connect_user_readonly = $flag;
124
        $this->settings()->setReadOnlyUser($this->_connect_user_readonly);
125
    }
126
    /**
127
     * Clear Degeneration processing request [template ]
128
     *
129
     * @return bool
130
     */
131 1
    public function cleanQueryDegeneration()
132
    {
133 1
        return $this->_transport->cleanQueryDegeneration();
134
    }
135
136
    /**
137
     * add Degeneration processing
138
     *
139
     * @param Query\Degeneration $degeneration
140
     * @return bool
141
     */
142
    public function addQueryDegeneration(Query\Degeneration $degeneration)
143
    {
144
        return $this->_transport->addQueryDegeneration($degeneration);
145
    }
146
147
    /**
148
     * add Conditions in query
149
     *
150
     * @return bool
151
     */
152 1
    public function enableQueryConditions()
153
    {
154 1
        return $this->_transport->addQueryDegeneration(new \ClickHouseDB\Query\Degeneration\Conditions());
155
    }
156
    /**
157
     * Set connection host
158
     *
159
     * @param string|array $host
160
     */
161
    public function setHost($host)
162
    {
163
164
        if (is_array($host))
165
        {
166
            $host = array_rand(array_flip($host));
167
        }
168
169
        $this->_connect_host = $host;
170
        $this->transport()->setHost($host);
171
    }
172
173
    /**
174
     * Таймаут
175
     *
176
     * @param int $timeout
177
     * @return Settings
178
     */
179 2
    public function setTimeout($timeout)
180
    {
181 2
        return $this->settings()->max_execution_time($timeout);
182
    }
183
184
    /**
185
     * Timeout
186
     *
187
     * @return mixed
188
     */
189 1
    public function getTimeout()
190
    {
191 1
        return $this->settings()->getTimeOut();
192
    }
193
194
    /**
195
     * ConnectTimeOut in seconds ( support 1.5 = 1500ms )
196
     *
197
     * @param int $connectTimeOut
198
     */
199 2
    public function setConnectTimeOut($connectTimeOut)
200
    {
201 2
        $this->transport()->setConnectTimeOut($connectTimeOut);
202 2
    }
203
204
    /**
205
     * get ConnectTimeOut
206
     *
207
     * @return int
208
     */
209 1
    public function getConnectTimeOut()
210
    {
211 1
        return $this->transport()->getConnectTimeOut();
212
    }
213
214
215
    /**
216
     * transport
217
     *
218
     * @return Http
219
     */
220 44
    public function transport()
221
    {
222 44
        if (!$this->_transport) {
223
            throw  new \InvalidArgumentException('Empty transport class');
224
        }
225 44
        return $this->_transport;
226
    }
227
228
    /**
229
     * @return string
230
     */
231
    public function getConnectHost()
232
    {
233
        return $this->_connect_host;
234
    }
235
236
    /**
237
     * @return string
238
     */
239
    public function getConnectPassword()
240
    {
241
        return $this->_connect_password;
242
    }
243
244
    /**
245
     * @return string
246
     */
247
    public function getConnectPort()
248
    {
249
        return $this->_connect_port;
250
    }
251
252
    /**
253
     * @return string
254
     */
255
    public function getConnectUsername()
256
    {
257
        return $this->_connect_username;
258
    }
259
260
    /**
261
     * transport
262
     *
263
     * @return Http
264
     */
265
    public function getTransport()
266
    {
267
        return $this->_transport;
268
    }
269
270
271
    /**
272
     * Режим отладки CURL
273
     *
274
     * @return mixed
275
     */
276
    public function verbose()
277
    {
278
        return $this->transport()->verbose(true);
279
    }
280
281
    /**
282
     * @return Settings
283
     */
284 44
    public function settings()
285
    {
286 44
        return $this->transport()->settings();
287
    }
288
289
    /**
290
     * @return $this
291
     */
292 2
    public function useSession($useSessionId = false)
293
    {
294 2
        if (!$this->settings()->getSessionId())
295
        {
296 2
            if (!$useSessionId)
297
            {
298 2
                $this->settings()->makeSessionId();
299
            } else
300
            {
301
                $this->settings()->session_id($useSessionId);
302
            }
303
304
        }
305 2
        return $this;
306
    }
307
    /**
308
     * @return mixed
309
     */
310 2
    public function getSession()
311
    {
312 2
        return $this->settings()->getSessionId();
313
    }
314
315
    /**
316
     * Query CREATE/DROP
317
     *
318
     * @param string $sql
319
     * @param array $bindings
320
     * @param bool $exception
321
     * @return Statement
322
     * @throws Exception\TransportException
323
     */
324 18
    public function write($sql, $bindings = [], $exception = true)
325
    {
326 18
        return $this->transport()->write($sql, $bindings, $exception);
327
    }
328
329
    /**
330
     * set db name
331
     * @param string $db
332
     * @return $this
333
     */
334 1
    public function database($db)
335
    {
336 1
        $this->settings()->database($db);
337 1
        return $this;
338
    }
339
340
    /**
341
     * Write to system.query_log
342
     *
343
     * @param bool $flag
344
     * @return $this
345
     */
346
    public function enableLogQueries($flag = true)
347
    {
348
        $this->settings()->set('log_queries', intval($flag));
349
        return $this;
350
    }
351
352
    /**
353
     * Compress the result if the HTTP client said that it understands data compressed with gzip or deflate
354
     *
355
     * @param bool $flag
356
     * @return $this
357
     */
358 27
    public function enableHttpCompression($flag = true)
359
    {
360 27
        $this->settings()->enableHttpCompression($flag);
361 27
        return $this;
362
    }
363
364
    /**
365
     * Enable / Disable HTTPS
366
     *
367
     * @param bool $flag
368
     * @return $this
369
     */
370
    public function https($flag = true)
371
    {
372
        $this->settings()->https($flag);
373
        return $this;
374
    }
375
376
    /**
377
     * Read extremes of the result columns. They can be output in JSON-formats.
378
     *
379
     * @param bool $flag
380
     * @return $this
381
     */
382 2
    public function enableExtremes($flag = true)
383
    {
384 2
        $this->settings()->set('extremes', intval($flag));
385 2
        return $this;
386
    }
387
388
    /**
389
     * SELECT
390
     *
391
     * @param string $sql
392
     * @param array $bindings
393
     * @param null|WhereInFile $whereInFile
394
     * @param null|WriteToFile $writeToFile
395
     * @return Statement
396
     * @throws Exception\TransportException
397
     * @throws \Exception
398
     */
399 35
    public function select($sql, $bindings = [], $whereInFile = null, $writeToFile = null)
400
    {
401 35
        return $this->transport()->select($sql, $bindings, $whereInFile, $writeToFile);
402
    }
403
404
    /**
405
     * execute run
406
     *
407
     * @return bool
408
     * @throws Exception\TransportException
409
     */
410 8
    public function executeAsync()
411
    {
412 8
        return $this->transport()->executeAsync();
413
    }
414
415
    /**
416
     * set progressFunction
417
     *
418
     * @param callable $callback
419
     */
420 1
    public function progressFunction($callback)
421
    {
422 1
        if (!is_callable($callback)) {
423
            throw new \InvalidArgumentException('Not is_callable progressFunction');
424
        }
425
426 1
        if (!$this->settings()->is('send_progress_in_http_headers'))
427
        {
428 1
            $this->settings()->set('send_progress_in_http_headers', 1);
429
        }
430 1
        if (!$this->settings()->is('http_headers_progress_interval_ms'))
431
        {
432 1
            $this->settings()->set('http_headers_progress_interval_ms', 100);
433
        }
434
435
436 1
        $this->transport()->setProgressFunction($callback);
437 1
    }
438
439
    /**
440
     * prepare select
441
     *
442
     * @param string $sql
443
     * @param array $bindings
444
     * @param null $whereInFile
0 ignored issues
show
Documentation Bug introduced by
Are you sure the doc-type for parameter $whereInFile is correct as it would always require null to be passed?
Loading history...
445
     * @param null $writeToFile
0 ignored issues
show
Documentation Bug introduced by
Are you sure the doc-type for parameter $writeToFile is correct as it would always require null to be passed?
Loading history...
446
     * @return Statement
447
     * @throws Exception\TransportException
448
     * @throws \Exception
449
     */
450 4
    public function selectAsync($sql, $bindings = [], $whereInFile = null, $writeToFile = null)
451
    {
452 4
        return $this->transport()->selectAsync($sql, $bindings, $whereInFile, $writeToFile);
453
    }
454
455
    /**
456
     * SHOW PROCESSLIST
457
     *
458
     * @return array
459
     * @throws Exception\TransportException
460
     * @throws \Exception
461
     */
462
    public function showProcesslist()
463
    {
464
        return $this->select('SHOW PROCESSLIST')->rows();
465
    }
466
467
    /**
468
     * show databases
469
     *
470
     * @return array
471
     * @throws Exception\TransportException
472
     * @throws \Exception
473
     */
474
    public function showDatabases()
475
    {
476
        return $this->select('show databases')->rows();
477
    }
478
479
    /**
480
     * statement = SHOW CREATE TABLE
481
     *
482
     * @param string $table
483
     * @return mixed
484
     * @throws Exception\TransportException
485
     * @throws \Exception
486
     */
487
    public function showCreateTable($table)
488
    {
489
        return ($this->select('SHOW CREATE TABLE ' . $table)->fetchOne('statement'));
490
    }
491
492
    /**
493
     * SHOW TABLES
494
     *
495
     * @return array
496
     * @throws Exception\TransportException
497
     * @throws \Exception
498
     */
499 1
    public function showTables()
500
    {
501 1
        return $this->select('SHOW TABLES')->rowsAsTree('name');
502
    }
503
504
    /**
505
     * Get the number of simultaneous/Pending requests
506
     *
507
     * @return int
508
     */
509 9
    public function getCountPendingQueue()
510
    {
511 9
        return $this->transport()->getCountPendingQueue();
512
    }
513
514
    /**
515
     * Insert Array
516
     *
517
     * @param string $table
518
     * @param array $values
519
     * @param array $columns
520
     * @return Statement
521
     * @throws Exception\TransportException
522
     */
523 5
    public function insert($table, $values, $columns = [])
524
    {
525 5
        $sql = 'INSERT INTO ' . $table;
526
527 5
        if (0 !== count($columns)) {
528 5
            $sql .= ' (' . implode(',', $columns) . ') ';
529
        }
530
531 5
        $sql .= ' VALUES ';
532
533 5
        foreach ($values as $row) {
534 5
            $sql .= ' (' . FormatLine::Insert($row) . '), ';
535
        }
536 5
        $sql = trim($sql, ', ');
537 5
        return $this->transport()->write($sql);
538
    }
539
540
    /**
541
      * Prepares the values to insert from the associative array.
542
      * There may be one or more lines inserted, but then the keys inside the array list must match (including in the sequence)
543
      *
544
      * @param array $values - array column_name => value (if we insert one row) or array list column_name => value if we insert many lines
545
      * @return array - list of arrays - 0 => fields, 1 => list of value arrays for insertion
546
      */
547 3
    public function prepareInsertAssocBulk(array $values)
548
    {
549 3
        if (isset($values[0]) && is_array($values[0])) { //случай, когда много строк вставляется
550 2
            $preparedFields = array_keys($values[0]);
551 2
            $preparedValues = [];
552 2
            foreach ($values as $idx => $row) {
553 2
                $_fields = array_keys($row);
554 2
                if ($_fields !== $preparedFields) {
555 1
                    throw new QueryException("Fields not match: " . implode(',', $_fields) . " and " . implode(',', $preparedFields) . " on element $idx");
556
                }
557 2
                $preparedValues[] = array_values($row);
558
            }
559
        } else { //одна строка
560 1
            $preparedFields = array_keys($values);
561 1
            $preparedValues = [array_values($values)];
562
        }
563 2
        return [$preparedFields, $preparedValues];
564
    }
565
566
    /**
567
      * Inserts one or more rows from an associative array.
568
      * If there is a discrepancy between the keys of the value arrays (or their order) - throws an exception.
569
      *
570
      * @param string $ table - table name
0 ignored issues
show
Documentation Bug introduced by
The doc comment $ at position 0 could not be parsed: Unknown type name '$' at position 0 in $.
Loading history...
571
      * @param array $ values - array column_name => value (if we insert one row) or array list column_name => value if we insert many lines
572
      * @return Statement
573
      * @throws QueryException
574
      */
575
    public function insertAssocBulk($table, array $values)
576
    {
577
        list($columns, $vals) = $this->prepareInsertAssocBulk($values);
578
        return $this->insert($table, $vals, $columns);
579
    }
580
581
    /**
582
     * insert TabSeparated files
583
     *
584
     * @param string $table_name
585
     * @param string|array $file_names
586
     * @param array $columns_array
587
     * @return mixed
588
     * @throws Exception\TransportException
589
     */
590 1
    public function insertBatchTSVFiles($table_name, $file_names, $columns_array)
591
    {
592 1
        return $this->insertBatchFiles($table_name, $file_names, $columns_array, 'TabSeparated');
593
    }
594
595
    /**
596
     * insert Batch Files
597
     *
598
     * @param string $table_name
599
     * @param string|array $file_names
600
     * @param array $columns_array
601
     * @param string $format ['TabSeparated','TabSeparatedWithNames','CSV','CSVWithNames']
602
     * @return array
603
     * @throws Exception\TransportException
604
     */
605 7
    public function insertBatchFiles($table_name, $file_names, $columns_array, $format = "CSV")
606
    {
607 7
        if (is_string($file_names))
608
        {
609
            $file_names = [$file_names];
610
        }
611 7
        if ($this->getCountPendingQueue() > 0) {
612
            throw new QueryException('Queue must be empty, before insertBatch, need executeAsync');
613
        }
614
615 7
        if (!in_array($format, $this->_support_format))
616
        {
617
            throw new QueryException('Format not support in insertBatchFiles');
618
        }
619
620 7
        $result = [];
621
622 7
        foreach ($file_names as $fileName) {
623 7
            if (!is_file($fileName) || !is_readable($fileName)) {
624
                throw  new QueryException('Cant read file: ' . $fileName . ' ' . (is_file($fileName) ? '' : ' is not file'));
625
            }
626
627 7
            if (!$columns_array)
0 ignored issues
show
Bug Best Practice introduced by
The expression $columns_array of type array is implicitly converted to a boolean; are you sure this is intended? If so, consider using empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
628
            {
629
                $sql = 'INSERT INTO ' . $table_name . ' FORMAT ' . $format;
630
631
            } else
632
            {
633 7
                $sql = 'INSERT INTO ' . $table_name . ' ( ' . implode(',', $columns_array) . ' ) FORMAT ' . $format;
634
635
            }
636 7
            $result[$fileName] = $this->transport()->writeAsyncCSV($sql, $fileName);
637
        }
638
639
        // exec
640 7
        $exec = $this->executeAsync();
0 ignored issues
show
Unused Code introduced by
The assignment to $exec is dead and can be removed.
Loading history...
641
642
        // fetch resutl
643 7
        foreach ($file_names as $fileName) {
644 7
            if ($result[$fileName]->isError()) {
645 7
                $result[$fileName]->error();
646
            }
647
        }
648
649 5
        return $result;
650
    }
651
652
    /**
653
     * insert Batch Stream
654
     *
655
     * @param string $table_name
656
     * @param array $columns_array
657
     * @param string $format ['TabSeparated','TabSeparatedWithNames','CSV','CSVWithNames']
658
     * @return Transport\CurlerRequest
659
     */
660 2
    public function insertBatchStream($table_name, $columns_array, $format = "CSV")
661
    {
662 2
        if ($this->getCountPendingQueue() > 0) {
663
            throw new QueryException('Queue must be empty, before insertBatch, need executeAsync');
664
        }
665
666 2
        if (!in_array($format, $this->_support_format))
667
        {
668
            throw new QueryException('Format not support in insertBatchFiles');
669
        }
670
671 2
        if (!$columns_array)
0 ignored issues
show
Bug Best Practice introduced by
The expression $columns_array of type array is implicitly converted to a boolean; are you sure this is intended? If so, consider using empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
672
        {
673
            $sql = 'INSERT INTO ' . $table_name . ' FORMAT ' . $format;
674
675
        } else
676
        {
677 2
            $sql = 'INSERT INTO ' . $table_name . ' ( ' . implode(',', $columns_array) . ' ) FORMAT ' . $format;
678
679
        }
680
681 2
        return $this->transport()->writeStreamData($sql);
682
    }
683
684
685
    /**
686
     * Size of database
687
     *
688
     * @return mixed|null
689
     * @throws Exception\TransportException
690
     * @throws \Exception
691
     */
692
    public function databaseSize()
693
    {
694
        $b = $this->settings()->getDatabase();
695
696
        return $this->select('
697
            SELECT database,formatReadableSize(sum(bytes)) as size
698
            FROM system.parts
699
            WHERE active AND database=:database
700
            GROUP BY database
701
        ', ['database' => $b])->fetchOne();
702
    }
703
704
    /**
705
     * Size of tables
706
     *
707
     * @param string $tableName
708
     * @return mixed
709
     * @throws Exception\TransportException
710
     * @throws \Exception
711
     */
712 1
    public function tableSize($tableName)
713
    {
714 1
        $tables = $this->tablesSize();
715
716 1
        if (isset($tables[$tableName])) {
717 1
            return $tables[$tableName];
718
        }
719
720
        return null;
721
    }
722
723
    /**
724
     * ping & check
725
     *
726
     * @return bool
727
     * @throws Exception\TransportException
728
     * @throws \Exception
729
     */
730 33
    public function ping()
731
    {
732 33
        $result = $this->select('SELECT 1 as ping')->fetchOne('ping');
733 33
        return ($result == 1);
734
    }
735
736
    /**
737
     * Tables sizes
738
     *
739
     * @param bool $flatList
740
     * @return array
741
     * @throws Exception\TransportException
742
     * @throws \Exception
743
     */
744 1
    public function tablesSize($flatList = false)
745
    {
746 1
        $z = $this->select('
747
        SELECT name as table,database,
748
            max(sizebytes) as sizebytes,
749
            max(size) as size,
750
            min(min_date) as min_date,
751
            max(max_date) as max_date
752
            FROM system.tables
753
            ANY LEFT JOIN 
754
            (
755
            SELECT table,database,
756
                        formatReadableSize(sum(bytes)) as size,
757
                        sum(bytes) as sizebytes,
758
                        min(min_date) as min_date,
759
                        max(max_date) as max_date
760
                        FROM system.parts 
761
                        WHERE active AND database=:database
762
                        GROUP BY table,database
763
            ) USING ( table,database )
764
            WHERE database=:database
765
            GROUP BY table,database
766 1
        ', ['database'=>$this->settings()->getDatabase()]);
767
768 1
        if ($flatList) {
769
            return $z->rows();
770
        }
771
772
773 1
        return $z->rowsAsTree('table');
774
775
776
    }
777
778
779
    /**
780
     * isExists
781
     *
782
     * @param string $database
783
     * @param string $table
784
     * @return array
785
     * @throws Exception\TransportException
786
     * @throws \Exception
787
     */
788
    public function isExists($database, $table)
789
    {
790
        return $this->select('
791
            SELECT *
792
            FROM system.tables 
793
            WHERE name=\''.$table . '\' AND database=\'' . $database . '\''
794
        )->rowsAsTree('name');
795
    }
796
797
798
    /**
799
     * List of partitions
800
     *
801
     * @param string $table
802
     * @param int $limit
803
     * @return array
804
     * @throws Exception\TransportException
805
     * @throws \Exception
806
     */
807
    public function partitions($table, $limit = -1)
808
    {
809
        return $this->select('
810
            SELECT *
811
            FROM system.parts 
812
            WHERE like(table,\'%' . $table . '%\') AND database=\'' . $this->settings()->getDatabase() . '\' 
813
            ORDER BY max_date ' . ($limit > 0 ? ' LIMIT ' . intval($limit) : '')
814
        )->rowsAsTree('name');
815
    }
816
817
    /**
818
     * dropPartition
819
     *
820
     * @param string $dataBaseTableName database_name.table_name
821
     * @param string $partition_id
822
     * @return Statement
823
     * @throws Exception\TransportException
824
     */
825
    public function dropPartition($dataBaseTableName, $partition_id)
826
    {
827
828
        $partition_id = trim($partition_id, '\'');
829
        $this->settings()->set('replication_alter_partitions_sync', 2);
830
        $state = $this->write('ALTER TABLE {dataBaseTableName} DROP PARTITION :partion_id', [
831
            'dataBaseTableName'  => $dataBaseTableName,
832
            'partion_id' => $partition_id
833
        ]);
834
        return $state;
835
    }
836
837
    /**
838
     * Truncate ( drop all partitions )
839
     *
840
     * @param string $tableName
841
     * @return array
842
     * @throws Exception\TransportException
843
     * @throws \Exception
844
     */
845
    public function truncateTable($tableName)
846
    {
847
        $partions = $this->partitions($tableName);
848
        $out = [];
849
        foreach ($partions as $part_key=>$part)
850
        {
851
            $part_id = $part['partition'];
852
            $out[$part_id] = $this->dropPartition($tableName, $part_id);
853
        }
854
        return $out;
855
    }
856
857
    /**
858
     * dropOldPartitions by day_ago
859
     *
860
     * @param string $table_name
861
     * @param int $days_ago
862
     * @param int $count_partitons_per_one
863
     * @return array
864
     * @throws Exception\TransportException
865
     * @throws \Exception
866
     */
867
    public function dropOldPartitions($table_name, $days_ago, $count_partitons_per_one = 100)
868
    {
869
        $days_ago = strtotime(date('Y-m-d 00:00:00', strtotime('-' . $days_ago . ' day')));
870
871
        $drop = [];
872
        $list_patitions = $this->partitions($table_name, $count_partitons_per_one);
873
874
        foreach ($list_patitions as $partion_id => $partition) {
875
            if (stripos($partition['engine'], 'mergetree') === false) {
876
                continue;
877
            }
878
879
            $min_date = strtotime($partition['min_date']);
0 ignored issues
show
Unused Code introduced by
The assignment to $min_date is dead and can be removed.
Loading history...
880
            $max_date = strtotime($partition['max_date']);
881
882
            if ($max_date < $days_ago) {
883
                $drop[] = $partition['partition'];
884
            }
885
        }
886
887
        $result = [];
888
        foreach ($drop as $partition_id) {
889
            $result[$partition_id] = $this->dropPartition($table_name, $partition_id);
890
        }
891
892
        return $result;
893
    }
894
}
895