Passed
Push — master ( 22bce8...5a27f9 )
by Igor
03:05
created

Client::tablesSize()   B

Complexity

Conditions 2
Paths 2

Size

Total Lines 30
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 2.0185

Importance

Changes 0
Metric Value
dl 0
loc 30
ccs 5
cts 6
cp 0.8333
rs 8.8571
c 0
b 0
f 0
cc 2
eloc 5
nc 2
nop 1
crap 2.0185
1
<?php
2
3
namespace ClickHouseDB;
4
5
use ClickHouseDB\Exception\QueryException;
6
use ClickHouseDB\Query\Degeneration\Bindings;
7
use ClickHouseDB\Query\WhereInFile;
8
use ClickHouseDB\Query\WriteToFile;
9
use ClickHouseDB\Quote\FormatLine;
10
use ClickHouseDB\Transport\Http;
11
12
class Client
13
{
14
    /**
15
     * @var Http
16
     */
17
    private $_transport = null;
18
19
    /**
20
     * @var string
21
     */
22
    private $_connect_username = '';
23
24
    /**
25
     * @var string
26
     */
27
    private $_connect_password = '';
28
29
    /**
30
     * @var string
31
     */
32
    private $_connect_host = '';
33
34
    /**
35
     * @var string
36
     */
37
    private $_connect_port = '';
38
39
    /**
40
     * @var bool
41
     */
42
    private $_connect_user_readonly=false;
43
    /**
44
     * @var array
45
     */
46
    private $_support_format=['TabSeparated','TabSeparatedWithNames','CSV','CSVWithNames','JSONEachRow'];
47
48
    /**
49
     * Client constructor.
50
     * @param array $connect_params
51
     * @param array $settings
52
     */
53 44
    public function __construct($connect_params, $settings = [])
54
    {
55 44
        if (!isset($connect_params['username'])) {
56
            throw  new \InvalidArgumentException('not set username');
57
        }
58
59 44
        if (!isset($connect_params['password'])) {
60
            throw  new \InvalidArgumentException('not set password');
61
        }
62
63 44
        if (!isset($connect_params['port'])) {
64
            throw  new \InvalidArgumentException('not set port');
65
        }
66
67 44
        if (!isset($connect_params['host'])) {
68
            throw  new \InvalidArgumentException('not set host');
69
        }
70
71 44
        if (isset($connect_params['settings']) && is_array($connect_params['settings'])) {
72 2
            if (empty($settings)) {
73 2
                $settings = $connect_params['settings'];
74
            }
75
        }
76
77 44
        $this->_connect_username    = $connect_params['username'];
78 44
        $this->_connect_password    = $connect_params['password'];
79 44
        $this->_connect_port        = $connect_params['port'];
80 44
        $this->_connect_host        = $connect_params['host'];
81
82
83
        // init transport class
84 44
        $this->_transport = new Http(
85 44
            $this->_connect_host,
86 44
            $this->_connect_port,
87 44
            $this->_connect_username,
88 44
            $this->_connect_password
89
        );
90
91
92 44
        $this->_transport->addQueryDegeneration(new Bindings());
93
94
        // apply settings to transport class
95 44
        $this->settings()->database('default');
96 44
        if (sizeof($settings)) {
97 2
            $this->settings()->apply($settings);
98
        }
99
100
101 44
        if (isset($connect_params['readonly']))
102
        {
103
            $this->setReadOnlyUser($connect_params['readonly']);
104
        }
105
106 44
        if (isset($connect_params['https']))
107
        {
108
            $this->https($connect_params['https']);
109
        }
110
111
112
113
114 44
    }
115
116
    /**
117
     * if the user has only read in the config file
118
     *
119
     * @param bool $flag
120
     */
121
    public function setReadOnlyUser($flag)
122
    {
123
        $this->_connect_user_readonly=$flag;
124
        $this->settings()->setReadOnlyUser($this->_connect_user_readonly);
125
    }
126
    /**
127
     * Clear Degeneration processing request [template ]
128
     *
129
     * @return bool
130
     */
131 1
    public function cleanQueryDegeneration()
132
    {
133 1
        return $this->_transport->cleanQueryDegeneration();
134
    }
135
136
    /**
137
     * add Degeneration processing
138
     *
139
     * @param Query\Degeneration $degeneration
140
     * @return bool
141
     */
142
    public function addQueryDegeneration(Query\Degeneration $degeneration)
143
    {
144
        return $this->_transport->addQueryDegeneration($degeneration);
145
    }
146
147
    /**
148
     * add Conditions in query
149
     *
150
     * @return bool
151
     */
152 1
    public function enableQueryConditions()
153
    {
154 1
        return $this->_transport->addQueryDegeneration(new \ClickHouseDB\Query\Degeneration\Conditions());
155
    }
156
    /**
157
     * Set connection host
158
     *
159
     * @param string|array $host
160
     */
161
    public function setHost($host)
162
    {
163
164
        if (is_array($host))
165
        {
166
            $host=array_rand(array_flip($host));
167
        }
168
169
        $this->_connect_host=$host;
170
        $this->transport()->setHost($host);
171
    }
172
173
    /**
174
     * Таймаут
175
     *
176
     * @param int $timeout
177
     * @return Settings
178
     */
179 2
    public function setTimeout($timeout)
180
    {
181 2
       return $this->settings()->max_execution_time($timeout);
182
    }
183
184
    /**
185
     * Timeout
186
     *
187
     * @return mixed
188
     */
189 1
    public function getTimeout()
190
    {
191 1
        return $this->settings()->getTimeOut();
192
    }
193
194
    /**
195
     * ConnectTimeOut in seconds ( support 1.5 = 1500ms )
196
     *
197
     * @param int $connectTimeOut
198
     */
199 2
    public function setConnectTimeOut($connectTimeOut)
200
    {
201 2
        $this->transport()->setConnectTimeOut($connectTimeOut);
202 2
    }
203
204
    /**
205
     * get ConnectTimeOut
206
     *
207
     * @return int
208
     */
209 1
    public function getConnectTimeOut()
210
    {
211 1
        return $this->transport()->getConnectTimeOut();
212
    }
213
214
215
    /**
216
     * transport
217
     *
218
     * @return Http
219
     */
220 44
    public function transport()
221
    {
222 44
        if (!$this->_transport) {
223
            throw  new \InvalidArgumentException('Empty transport class');
224
        }
225 44
        return $this->_transport;
226
    }
227
228
    /**
229
     * @return string
230
     */
231
    public function getConnectHost()
232
    {
233
        return $this->_connect_host;
234
    }
235
236
    /**
237
     * @return string
238
     */
239
    public function getConnectPassword()
240
    {
241
        return $this->_connect_password;
242
    }
243
244
    /**
245
     * @return string
246
     */
247
    public function getConnectPort()
248
    {
249
        return $this->_connect_port;
250
    }
251
252
    /**
253
     * @return string
254
     */
255
    public function getConnectUsername()
256
    {
257
        return $this->_connect_username;
258
    }
259
260
    /**
261
     * transport
262
     *
263
     * @return Http
264
     */
265
    public function getTransport()
266
    {
267
        return $this->_transport;
268
    }
269
270
271
    /**
272
     * Режим отладки CURL
273
     *
274
     * @return mixed
275
     */
276
    public function verbose()
277
    {
278
        return $this->transport()->verbose(true);
279
    }
280
281
    /**
282
     * @return Settings
283
     */
284 44
    public function settings()
285
    {
286 44
        return $this->transport()->settings();
287
    }
288
289
    /**
290
     * @return $this
291
     */
292 2
    public function useSession($useSessionId=false)
293
    {
294 2
        if (!$this->settings()->getSessionId())
295
        {
296 2
            if (!$useSessionId)
297
            {
298 2
                $this->settings()->makeSessionId();
299
            }
300
            else
301
            {
302
                $this->settings()->session_id($useSessionId);
303
            }
304
305
        }
306 2
        return $this;
307
    }
308
    /**
309
     * @return mixed
310
     */
311 2
    public function getSession()
312
    {
313 2
        return $this->settings()->getSessionId();
314
    }
315
316
    /**
317
     * Query CREATE/DROP
318
     *
319
     * @param string $sql
320
     * @param array $bindings
321
     * @param bool $exception
322
     * @return Statement
323
     * @throws Exception\TransportException
324
     */
325 18
    public function write($sql, $bindings = [], $exception = true)
326
    {
327 18
        return $this->transport()->write($sql, $bindings, $exception);
328
    }
329
330
    /**
331
     * set db name
332
     * @param string $db
333
     * @return $this
334
     */
335 1
    public function database($db)
336
    {
337 1
        $this->settings()->database($db);
338 1
        return $this;
339
    }
340
341
    /**
342
     * Write to system.query_log
343
     *
344
     * @param bool $flag
345
     * @return $this
346
     */
347
    public function enableLogQueries($flag = true)
348
    {
349
        $this->settings()->set('log_queries',intval($flag));
350
        return $this;
351
    }
352
353
    /**
354
     * Compress the result if the HTTP client said that it understands data compressed with gzip or deflate
355
     *
356
     * @param bool $flag
357
     * @return $this
358
     */
359 27
    public function enableHttpCompression($flag = true)
360
    {
361 27
        $this->settings()->enableHttpCompression($flag);
362 27
        return $this;
363
    }
364
365
    /**
366
     * Enable / Disable HTTPS
367
     *
368
     * @param bool $flag
369
     * @return $this
370
     */
371
    public function https($flag=true)
372
    {
373
        $this->settings()->https($flag);
374
        return $this;
375
    }
376
377
    /**
378
     * Read extremes of the result columns. They can be output in JSON-formats.
379
     *
380
     * @param bool $flag
381
     * @return $this
382
     */
383 2
    public function enableExtremes($flag = true)
384
    {
385 2
        $this->settings()->set('extremes',intval($flag));
386 2
        return $this;
387
    }
388
389
    /**
390
     * SELECT
391
     *
392
     * @param string $sql
393
     * @param array $bindings
394
     * @param null|WhereInFile $whereInFile
395
     * @param null|WriteToFile $writeToFile
396
     * @return Statement
397
     * @throws Exception\TransportException
398
     * @throws \Exception
399
     */
400 35
    public function select($sql, $bindings = [], $whereInFile = null, $writeToFile=null)
401
    {
402 35
        return $this->transport()->select($sql, $bindings, $whereInFile,$writeToFile);
403
    }
404
405
    /**
406
     * execute run
407
     *
408
     * @return bool
409
     * @throws Exception\TransportException
410
     */
411 8
    public function executeAsync()
412
    {
413 8
        return $this->transport()->executeAsync();
414
    }
415
416
    /**
417
     * set progressFunction
418
     *
419
     * @param callable $callback
420
     */
421 1
    public function progressFunction($callback)
422
    {
423 1
        if (!is_callable($callback)) throw new \InvalidArgumentException('Not is_callable progressFunction');
424
425 1
        if (!$this->settings()->is('send_progress_in_http_headers'))
426
        {
427 1
            $this->settings()->set('send_progress_in_http_headers', 1);
428
        }
429 1
        if (!$this->settings()->is('http_headers_progress_interval_ms'))
430
        {
431 1
            $this->settings()->set('http_headers_progress_interval_ms', 100);
432
        }
433
434
435 1
        $this->transport()->setProgressFunction($callback);
436 1
    }
437
438
    /**
439
     * prepare select
440
     *
441
     * @param string $sql
442
     * @param array $bindings
443
     * @param null $whereInFile
0 ignored issues
show
Documentation Bug introduced by
Are you sure the doc-type for parameter $whereInFile is correct as it would always require null to be passed?
Loading history...
444
     * @param null $writeToFile
0 ignored issues
show
Documentation Bug introduced by
Are you sure the doc-type for parameter $writeToFile is correct as it would always require null to be passed?
Loading history...
445
     * @return Statement
446
     * @throws Exception\TransportException
447
     * @throws \Exception
448
     */
449 4
    public function selectAsync($sql, $bindings = [], $whereInFile = null,$writeToFile=null)
450
    {
451 4
        return $this->transport()->selectAsync($sql, $bindings, $whereInFile,$writeToFile);
452
    }
453
454
    /**
455
     * SHOW PROCESSLIST
456
     *
457
     * @return array
458
     * @throws Exception\TransportException
459
     * @throws \Exception
460
     */
461
    public function showProcesslist()
462
    {
463
        return $this->select('SHOW PROCESSLIST')->rows();
464
    }
465
466
    /**
467
     * show databases
468
     *
469
     * @return array
470
     * @throws Exception\TransportException
471
     * @throws \Exception
472
     */
473
    public function showDatabases()
474
    {
475
        return $this->select('show databases')->rows();
476
    }
477
478
    /**
479
     * statement = SHOW CREATE TABLE
480
     *
481
     * @param string $table
482
     * @return mixed
483
     * @throws Exception\TransportException
484
     * @throws \Exception
485
     */
486
    public function showCreateTable($table)
487
    {
488
        return ($this->select('SHOW CREATE TABLE '.$table)->fetchOne('statement'));
489
    }
490
491
    /**
492
     * SHOW TABLES
493
     *
494
     * @return array
495
     * @throws Exception\TransportException
496
     * @throws \Exception
497
     */
498 1
    public function showTables()
499
    {
500 1
        return $this->select('SHOW TABLES')->rowsAsTree('name');
501
    }
502
503
    /**
504
     * Get the number of simultaneous/Pending requests
505
     *
506
     * @return int
507
     */
508 9
    public function getCountPendingQueue()
509
    {
510 9
        return $this->transport()->getCountPendingQueue();
511
    }
512
513
    /**
514
     * Insert Array
515
     *
516
     * @param string $table
517
     * @param array $values
518
     * @param array $columns
519
     * @return Statement
520
     * @throws Exception\TransportException
521
     */
522 5
    public function insert($table, $values, $columns = [])
523
    {
524 5
        $sql = 'INSERT INTO ' . $table;
525
526 5
        if (0 !== count($columns)) {
527 5
            $sql .= ' (' . implode(',', $columns) . ') ';
528
        }
529
530 5
        $sql .= ' VALUES ';
531
532 5
        foreach ($values as $row) {
533 5
            $sql .= ' (' . FormatLine::Insert($row) . '), ';
534
        }
535 5
        $sql = trim($sql, ', ');
536 5
        return $this->transport()->write($sql);
537
    }
538
539
    /**
540
      * Prepares the values to insert from the associative array.
541
      * There may be one or more lines inserted, but then the keys inside the array list must match (including in the sequence)
542
      *
543
      * @param array $values - array column_name => value (if we insert one row) or array list column_name => value if we insert many lines
544
      * @return array - list of arrays - 0 => fields, 1 => list of value arrays for insertion
545
      */
546 3
    public function prepareInsertAssocBulk(array $values)
547
    {
548 3
        if (isset($values[0]) && is_array($values[0])){ //случай, когда много строк вставляется
549 2
            $preparedFields = array_keys($values[0]);
550 2
            $preparedValues = [];
551 2
            foreach ($values as $idx => $row){
552 2
                $_fields = array_keys($row);
553 2
                if ($_fields !== $preparedFields){
554 1
                    throw new QueryException("Fields not match: ".implode(',',$_fields)." and ".implode(',', $preparedFields)." on element $idx");
555
                }
556 2
                $preparedValues[] = array_values($row);
557
            }
558
        }else{ //одна строка
559 1
            $preparedFields = array_keys($values);
560 1
            $preparedValues = [array_values($values)];
561
        }
562 2
        return [$preparedFields, $preparedValues];
563
    }
564
565
    /**
566
      * Inserts one or more rows from an associative array.
567
      * If there is a discrepancy between the keys of the value arrays (or their order) - throws an exception.
568
      *
569
      * @param string $ table - table name
0 ignored issues
show
Documentation Bug introduced by
The doc comment $ at position 0 could not be parsed: Unknown type name '$' at position 0 in $.
Loading history...
570
      * @param array $ values - array column_name => value (if we insert one row) or array list column_name => value if we insert many lines
571
      * @return Statement
572
      * @throws QueryException
573
      */
574
    public function insertAssocBulk($table, array $values)
575
    {
576
        list($columns, $vals) = $this->prepareInsertAssocBulk($values);
577
        return $this->insert($table, $vals, $columns);
578
    }
579
580
    /**
581
     * insert TabSeparated files
582
     *
583
     * @param string $table_name
584
     * @param string|array $file_names
585
     * @param array $columns_array
586
     * @return mixed
587
     * @throws Exception\TransportException
588
     */
589 1
    public function insertBatchTSVFiles($table_name, $file_names, $columns_array)
590
    {
591 1
        return $this->insertBatchFiles($table_name,$file_names,$columns_array,'TabSeparated');
592
    }
593
594
    /**
595
     * insert Batch Files
596
     *
597
     * @param string $table_name
598
     * @param string|array $file_names
599
     * @param array $columns_array
600
     * @param string $format ['TabSeparated','TabSeparatedWithNames','CSV','CSVWithNames']
601
     * @return array
602
     * @throws Exception\TransportException
603
     */
604 7
    public function insertBatchFiles($table_name, $file_names, $columns_array,$format="CSV")
605
    {
606 7
        if (is_string($file_names))
607
        {
608
            $file_names=[$file_names];
609
        }
610 7
        if ($this->getCountPendingQueue() > 0) {
611
            throw new QueryException('Queue must be empty, before insertBatch, need executeAsync');
612
        }
613
614 7
        if (!in_array($format,$this->_support_format))
615
        {
616
            throw new QueryException('Format not support in insertBatchFiles');
617
        }
618
619 7
        $result = [];
620
621 7
        foreach ($file_names as $fileName) {
622 7
            if (!is_file($fileName) || !is_readable($fileName)) {
623
                throw  new QueryException('Cant read file: ' . $fileName.' '.(is_file($fileName)?'':' is not file'));
624
            }
625
626 7
            if (!$columns_array)
0 ignored issues
show
Bug Best Practice introduced by
The expression $columns_array of type array is implicitly converted to a boolean; are you sure this is intended? If so, consider using empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
627
            {
628
                $sql = 'INSERT INTO ' . $table_name . ' FORMAT '.$format;
629
630
            }
631
            else
632
            {
633 7
                $sql = 'INSERT INTO ' . $table_name . ' ( ' . implode(',', $columns_array) . ' ) FORMAT '.$format;
634
635
            }
636 7
            $result[$fileName] = $this->transport()->writeAsyncCSV($sql, $fileName);
637
        }
638
639
        // exec
640 7
        $exec = $this->executeAsync();
0 ignored issues
show
Unused Code introduced by
The assignment to $exec is dead and can be removed.
Loading history...
641
642
        // fetch resutl
643 7
        foreach ($file_names as $fileName) {
644 7
            if ($result[$fileName]->isError()) {
645 7
                $result[$fileName]->error();
646
            }
647
        }
648
649 5
        return $result;
650
    }
651
652
    /**
653
     * insert Batch Stream
654
     *
655
     * @param string $table_name
656
     * @param array $columns_array
657
     * @param string $format ['TabSeparated','TabSeparatedWithNames','CSV','CSVWithNames']
658
     * @return Transport\CurlerRequest
659
     */
660 2
    public function insertBatchStream($table_name, $columns_array,$format="CSV")
661
    {
662 2
        if ($this->getCountPendingQueue() > 0) {
663
            throw new QueryException('Queue must be empty, before insertBatch, need executeAsync');
664
        }
665
666 2
        if (!in_array($format,$this->_support_format))
667
        {
668
            throw new QueryException('Format not support in insertBatchFiles');
669
        }
670
671 2
        if (!$columns_array)
0 ignored issues
show
Bug Best Practice introduced by
The expression $columns_array of type array is implicitly converted to a boolean; are you sure this is intended? If so, consider using empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
672
        {
673
            $sql = 'INSERT INTO ' . $table_name . ' FORMAT '.$format;
674
675
        }
676
        else
677
        {
678 2
            $sql = 'INSERT INTO ' . $table_name . ' ( ' . implode(',', $columns_array) . ' ) FORMAT '.$format;
679
680
        }
681
682 2
        return $this->transport()->writeStreamData($sql);
683
    }
684
685
686
    /**
687
     * Size of database
688
     *
689
     * @return mixed|null
690
     * @throws Exception\TransportException
691
     * @throws \Exception
692
     */
693
    public function databaseSize()
694
    {
695
        $b = $this->settings()->getDatabase();
696
697
        return $this->select('
698
            SELECT database,formatReadableSize(sum(bytes)) as size
699
            FROM system.parts
700
            WHERE active AND database=:database
701
            GROUP BY database
702
        ', ['database' => $b])->fetchOne();
703
    }
704
705
    /**
706
     * Size of tables
707
     *
708
     * @param string $tableName
709
     * @return mixed
710
     * @throws Exception\TransportException
711
     * @throws \Exception
712
     */
713 1
    public function tableSize($tableName)
714
    {
715 1
        $tables = $this->tablesSize();
716
717 1
        if (isset($tables[$tableName])) {
718 1
            return $tables[$tableName];
719
        }
720
721
        return null;
722
    }
723
724
    /**
725
     * ping & check
726
     *
727
     * @return bool
728
     * @throws Exception\TransportException
729
     * @throws \Exception
730
     */
731 33
    public function ping()
732
    {
733 33
        $result = $this->select('SELECT 1 as ping')->fetchOne('ping');
734 33
        return ($result == 1);
735
    }
736
737
    /**
738
     * Tables sizes
739
     *
740
     * @param bool $flatList
741
     * @return array
742
     * @throws Exception\TransportException
743
     * @throws \Exception
744
     */
745 1
    public function tablesSize($flatList=false)
746
    {
747 1
        $z=$this->select('
748
        SELECT name as table,database,
749
            max(sizebytes) as sizebytes,
750
            max(size) as size,
751
            min(min_date) as min_date,
752
            max(max_date) as max_date
753
            FROM system.tables
754
            ANY LEFT JOIN 
755
            (
756
            SELECT table,database,
757
                        formatReadableSize(sum(bytes)) as size,
758
                        sum(bytes) as sizebytes,
759
                        min(min_date) as min_date,
760
                        max(max_date) as max_date
761
                        FROM system.parts 
762
                        WHERE active AND database=:database
763
                        GROUP BY table,database
764
            ) USING ( table,database )
765
            WHERE database=:database
766
            GROUP BY table,database
767 1
        ', [ 'database'=>$this->settings()->getDatabase() ]);
768
769 1
        if ($flatList) {
770
            return $z->rows();
771
        }
772
773
774 1
        return $z->rowsAsTree('table');
775
776
777
    }
778
779
780
    /**
781
     * isExists
782
     *
783
     * @param string $database
784
     * @param string $table
785
     * @return array
786
     * @throws Exception\TransportException
787
     * @throws \Exception
788
     */
789
    public function isExists($database,$table)
790
    {
791
        return $this->select('
792
            SELECT *
793
            FROM system.tables 
794
            WHERE name=\''.$table.'\' AND database=\''.$database.'\''
795
        )->rowsAsTree('name');
796
    }
797
798
799
    /**
800
     * List of partitions
801
     *
802
     * @param string $table
803
     * @param int $limit
804
     * @return array
805
     * @throws Exception\TransportException
806
     * @throws \Exception
807
     */
808
    public function partitions($table, $limit = -1)
809
    {
810
        return $this->select('
811
            SELECT *
812
            FROM system.parts 
813
            WHERE like(table,\'%' . $table . '%\') AND database=\''.$this->settings()->getDatabase().'\' 
814
            ORDER BY max_date ' . ($limit > 0 ? ' LIMIT ' . intval($limit) : '')
815
        )->rowsAsTree('name');
816
    }
817
818
    /**
819
     * dropPartition
820
     *
821
     * @param string $dataBaseTableName database_name.table_name
822
     * @param string $partition_id
823
     * @return Statement
824
     * @throws Exception\TransportException
825
     */
826
    public function dropPartition($dataBaseTableName, $partition_id)
827
    {
828
829
        $partition_id=trim($partition_id,'\'');
830
        $this->settings()->set('replication_alter_partitions_sync',2);
831
        $state = $this->write('ALTER TABLE {dataBaseTableName} DROP PARTITION :partion_id', [
832
            'dataBaseTableName'  => $dataBaseTableName,
833
            'partion_id' => $partition_id
834
        ]);
835
        return $state;
836
    }
837
838
    /**
839
     * Truncate ( drop all partitions )
840
     *
841
     * @param string $tableName
842
     * @return array
843
     * @throws Exception\TransportException
844
     * @throws \Exception
845
     */
846
    public function truncateTable($tableName)
847
    {
848
        $partions=$this->partitions($tableName);
849
        $out=[];
850
        foreach ($partions as $part_key=>$part)
851
        {
852
            $part_id=$part['partition'];
853
            $out[$part_id]=$this->dropPartition($tableName,$part_id);
854
        }
855
        return $out;
856
    }
857
858
    /**
859
     * dropOldPartitions by day_ago
860
     *
861
     * @param string $table_name
862
     * @param int $days_ago
863
     * @param int $count_partitons_per_one
864
     * @return array
865
     * @throws Exception\TransportException
866
     * @throws \Exception
867
     */
868
    public function dropOldPartitions($table_name, $days_ago, $count_partitons_per_one = 100)
869
    {
870
        $days_ago = strtotime(date('Y-m-d 00:00:00', strtotime('-' . $days_ago . ' day')));
871
872
        $drop = [];
873
        $list_patitions = $this->partitions($table_name, $count_partitons_per_one);
874
875
        foreach ($list_patitions as $partion_id => $partition) {
876
            if (stripos($partition['engine'], 'mergetree') === false) {
877
                continue;
878
            }
879
880
            $min_date = strtotime($partition['min_date']);
0 ignored issues
show
Unused Code introduced by
The assignment to $min_date is dead and can be removed.
Loading history...
881
            $max_date = strtotime($partition['max_date']);
882
883
            if ($max_date < $days_ago) {
884
                $drop[] = $partition['partition'];
885
            }
886
        }
887
888
        $result=[];
889
        foreach ($drop as $partition_id) {
890
            $result[$partition_id]=$this->dropPartition($table_name, $partition_id);
891
        }
892
893
        return $result;
894
    }
895
}
896