Issues (2354)

src/Cluster.php (1 issue)

1
<?php
2
namespace ClickHouseDB;
3
4
use ClickHouseDB\Exception\QueryException;
5
6
class Cluster
7
{
8
    /**
9
     * @var array
10
     */
11
    private $nodes = [];
12
13
14
    /**
15
     * @var Client[]
16
     */
17
    private $clients = [];
18
19
    /**
20
     * @var Client
21
     */
22
    private $defaultClient;
23
24
    /**
25
     * @var array
26
     */
27
    private $badNodes = [];
28
29
    /**
30
     * @var array|bool
31
     */
32
    private $error = [];
33
    /**
34
     * @var array
35
     */
36
    private $resultScan = [];
37
    /**
38
     * @var string
39
     */
40
    private $defaultHostName;
41
42
    /**
43
     * @var int|float
44
     */
45
    private $scanTimeOut = 10;
46
47
    /**
48
     * @var array
49
     */
50
    private $tables = [];
51
52
    /**
53
     * @var array
54
     */
55
    private $hostsnames = [];
56
    /**
57
     * @var bool
58
     */
59
    private $isScaned = false;
60
61
62
    /**
63
     * A symptom of straining CH when checking a cluster request in Zookiper
64
     * false - send a request to ZK, do not do SELECT * FROM system.replicas
65
     *
66
     * @var bool
67
     */
68
    private $softCheck = true;
69
70
    /**
71
     * @var bool
72
     */
73
    private $replicasIsOk;
74
75
    /**
76
     * Cache
77
     *
78
     * @var array
79
     */
80
    private $_table_size_cache = [];
81
82
    /**
83
     * Cluster constructor.
84
     *
85
     * @param array $connect_params
86
     * @param array $settings
87
     */
88
    public function __construct($connect_params, $settings = [])
89
    {
90
        $this->defaultClient = new Client($connect_params, $settings);
91
        $this->defaultHostName = $this->defaultClient->getConnectHost();
92
        $this->setNodes(gethostbynamel($this->defaultHostName));
93
    }
94
95
    /**
96
     * @return Client
97
     */
98
    private function defaultClient()
99
    {
100
        return $this->defaultClient;
101
    }
102
103
    /**
104
     * @param bool $softCheck
105
     */
106
    public function setSoftCheck($softCheck)
107
    {
108
        $this->softCheck = $softCheck;
109
    }
110
111
    /**
112
     * @param float|integer $scanTimeOut
113
     */
114
    public function setScanTimeOut($scanTimeOut)
115
    {
116
        $this->scanTimeOut = $scanTimeOut;
117
    }
118
119
    /**
120
     * @param array $nodes
121
     */
122
    public function setNodes($nodes)
123
    {
124
        $this->nodes = $nodes;
125
    }
126
127
    /**
128
     * @return array
129
     */
130
    public function getNodes()
131
    {
132
        return $this->nodes;
133
    }
134
135
    /**
136
     * @return array
137
     */
138
    public function getBadNodes()
139
    {
140
        return $this->badNodes;
141
    }
142
143
144
    /**
145
     * Connect all nodes and scan
146
     *
147
     * @return $this
148
     * @throws Exception\TransportException
149
     */
150
    public function connect()
151
    {
152
        if (!$this->isScaned) {
153
            $this->rescan();
154
        }
155
        return $this;
156
    }
157
158
    /**
159
     * Check the status of the cluster, the request is taken from the documentation for CH
160
     * total_replicas <2 - not suitable for no replication clusters
161
     *
162
     *
163
     * @param mixed $replicas
164
     * @return bool
165
     */
166
    private function isReplicasWork($replicas)
167
    {
168
        $ok = true;
169
        if (!is_array($replicas)) {
170
            // @todo нет массива ошибка, т/к мы работем с репликами?
171
            // @todo Как быть есть в кластере НЕТ реплик ?
172
            return false;
173
        }
174
        foreach ($replicas as $replica) {
175
            if ($replica['is_readonly']) {
176
                $ok = false;
177
                $this->error[] = 'is_readonly : ' . json_encode($replica);
178
            }
179
            if ($replica['is_session_expired']) {
180
                $ok = false;
181
                $this->error[] = 'is_session_expired : ' . json_encode($replica);
182
            }
183
            if ($replica['future_parts'] > 20) {
184
                $ok = false;
185
                $this->error[] = 'future_parts : ' . json_encode($replica);
186
            }
187
            if ($replica['parts_to_check'] > 10) {
188
                $ok = false;
189
                $this->error[] = 'parts_to_check : ' . json_encode($replica);
190
            }
191
192
            // @todo : rewrite total_replicas=1 если кластер без реплики , нужно проверять какой класте и сколько в нем реплик
193
//            if ($replica['total_replicas']<2) {$ok=false;$this->error[]='total_replicas : '.json_encode($replica);}
194
            if ($this->softCheck)
195
            {
196
                if (!$ok) {
197
                    break;
198
                }
199
                continue;
200
            }
201
202
            if ($replica['active_replicas'] < $replica['total_replicas']) {
203
                $ok = false;
204
                $this->error[] = 'active_replicas : ' . json_encode($replica);
205
            }
206
            if ($replica['queue_size'] > 20) {
207
                $ok = false;
208
                $this->error[] = 'queue_size : ' . json_encode($replica);
209
            }
210
            if (($replica['log_max_index'] - $replica['log_pointer']) > 10) {
211
                $ok = false;
212
                $this->error[] = 'log_max_index : ' . json_encode($replica);
213
            }
214
            if (!$ok) {
215
                break;
216
            }
217
        }
218
        return $ok;
219
    }
220
221
    private function getSelectSystemReplicas()
222
    {
223
        // If you query all the columns, then the table may work slightly slow, since there are several readings from ZK per line.
224
        // If you do not query the last 4 columns (log_max_index, log_pointer, total_replicas, active_replicas), then the table works quickly.        if ($this->softCheck)
225
226
            return 'SELECT 
227
            database,table,engine,is_leader,is_readonly,
228
            is_session_expired,future_parts,parts_to_check,zookeeper_path,replica_name,replica_path,columns_version,
229
            queue_size,inserts_in_queue,merges_in_queue,queue_oldest_time,inserts_oldest_time,merges_oldest_time			
230
            FROM system.replicas
231
        ';
232
        //        return 'SELECT * FROM system.replicas';
233
    }
234
235
    /**
236
     * @return $this
237
     * @throws Exception\TransportException
238
     */
239
    public function rescan()
240
    {
241
        $this->error = [];
242
        /*
243
         * 1) Get the IP list
244
         * 2) To each connect via IP, through activeClient replacing host on ip
245
         * 3) We get information system.clusters + system.replicas from each machine, overwrite {DnsCache + timeOuts}
246
         * 4) Determine the necessary machines for the cluster / replica
247
         * 5) .... ?
248
         */
249
        $statementsReplicas = [];
250
        $statementsClusters = [];
251
        $result = [];
252
253
        $badNodes = [];
254
        $replicasIsOk = true;
255
256
        foreach ($this->nodes as $node) {
257
            $this->defaultClient()->setHost($node);
258
259
260
261
262
            $statementsReplicas[$node] = $this->defaultClient()->selectAsync($this->getSelectSystemReplicas());
263
            $statementsClusters[$node] = $this->defaultClient()->selectAsync('SELECT * FROM system.clusters');
264
            // пересетапим timeout
265
            $statementsReplicas[$node]->getRequest()->setDnsCache(0)->timeOut($this->scanTimeOut)->connectTimeOut($this->scanTimeOut);
266
            $statementsClusters[$node]->getRequest()->setDnsCache(0)->timeOut($this->scanTimeOut)->connectTimeOut($this->scanTimeOut);
267
        }
268
        $this->defaultClient()->executeAsync();
269
        $tables = [];
270
271
        foreach ($this->nodes as $node) {
272
273
274
            try {
275
                $r = $statementsReplicas[$node]->rows();
276
                foreach ($r as $row) {
277
                    $tables[$row['database']][$row['table']][$node] = $row;
278
                }
279
                $result['replicas'][$node] = $r;
280
            }catch (\Exception $E) {
281
                $result['replicas'][$node] = false;
282
                $badNodes[$node] = $E->getMessage();
283
                $this->error[] = 'statementsReplicas:' . $E->getMessage();
284
            }
285
            // ---------------------------------------------------------------------------------------------------
286
            $hosts = [];
287
288
            try {
289
                $c = $statementsClusters[$node]->rows();
290
                $result['clusters'][$node] = $c;
291
                foreach ($c as $row) {
292
                    $hosts[$row['host_address']][$row['port']] = $row['host_name'];
293
                    $result['cluster.list'][$row['cluster']][$row['host_address']] =
294
                        [
295
                            'shard_weight' => $row['shard_weight'],
296
                            'replica_num' => $row['replica_num'],
297
                            'shard_num' => $row['shard_num'],
298
                            'is_local' => $row['is_local']
299
                        ];
300
                }
301
302
            }catch (\Exception $E) {
303
                $result['clusters'][$node] = false;
304
305
                $this->error[] = 'clusters:' . $E->getMessage();
306
                $badNodes[$node] = $E->getMessage();
307
308
            }
309
            $this->hostsnames = $hosts;
310
            $this->tables = $tables;
311
            // ---------------------------------------------------------------------------------------------------
312
            // Let's check that replication goes well
313
            $rIsOk = $this->isReplicasWork($result['replicas'][$node]);
314
            $result['replicasIsOk'][$node] = $rIsOk;
315
            if (!$rIsOk) {
316
                $replicasIsOk = false;
317
            }
318
            // ---------------------------------------------------------------------------------------------------
319
        }
320
321
        // badNodes = array(6) {  '222.222.222.44' =>  string(13) "HttpCode:0 ; " , '222.222.222.11' =>  string(13) "HttpCode:0 ; "
322
        $this->badNodes = $badNodes;
323
324
        // Restore DNS host name on ch_client
325
        $this->defaultClient()->setHost($this->defaultHostName);
326
327
328
        $this->isScaned = true;
329
        $this->replicasIsOk = $replicasIsOk;
330
        $this->error[] = "Bad replicasIsOk, in " . json_encode($result['replicasIsOk']);
0 ignored issues
show
Coding Style Comprehensibility introduced by
The string literal Bad replicasIsOk, in does not require double quotes, as per coding-style, please use single quotes.

PHP provides two ways to mark string literals. Either with single quotes 'literal' or with double quotes "literal". The difference between these is that string literals in double quotes may contain variables with are evaluated at run-time as well as escape sequences.

String literals in single quotes on the other hand are evaluated very literally and the only two characters that needs escaping in the literal are the single quote itself (\') and the backslash (\\). Every other character is displayed as is.

Double quoted string literals may contain other variables or more complex escape sequences.

<?php

$singleQuoted = 'Value';
$doubleQuoted = "\tSingle is $singleQuoted";

print $doubleQuoted;

will print an indented: Single is Value

If your string literal does not contain variables or escape sequences, it should be defined using single quotes to make that fact clear.

For more information on PHP string literals and available escape sequences see the PHP core documentation.

Loading history...
331
        // ------------------------------------------------
332
        // @todo : To specify on fighting falls and at different-sided configurations ...
333
        if (sizeof($this->badNodes)) {
334
            $this->error[] = 'Have bad node : ' . json_encode($this->badNodes);
335
            $this->replicasIsOk = false;
336
        }
337
        if (!sizeof($this->error)) {
338
            $this->error = false;
339
        }
340
        $this->resultScan = $result;
341
        // @todo  : We connect to everyone in the DNS list, we need to decry that the requests were returned by all the hosts to which we connected
342
        return $this;
343
    }
344
345
    /**
346
     * @return boolean
347
     * @throws Exception\TransportException
348
     */
349
    public function isReplicasIsOk()
350
    {
351
        return $this->connect()->replicasIsOk;
352
    }
353
354
    /**
355
     * @param  string $node
356
     * @return Client
357
     */
358
    public function client($node)
359
    {
360
        // Создаем клиенты под каждый IP
361
        if (empty($this->clients[$node])) {
362
            $this->clients[$node] = clone $this->defaultClient();
363
        }
364
365
        $this->clients[$node]->setHost($node);
366
367
        return $this->clients[$node];
368
    }
369
370
    /**
371
     * @return Client
372
     * @throws Exception\TransportException
373
     */
374
    public function clientLike($cluster, $ip_addr_like)
375
    {
376
        $nodes_check = $this->nodes;
377
        $nodes = $this->getClusterNodes($cluster);
378
        $list_ips_need = explode(';', $ip_addr_like);
379
        $find = false;
380
        foreach ($list_ips_need as $like)
381
        {
382
            foreach ($nodes as $node)
383
            {
384
385
                if (stripos($node, $like) !== false)
386
                {
387
                    if (in_array($node, $nodes_check))
388
                    {
389
                        $find = $node;
390
                    } else
391
                    {
392
                        // node exists on cluster, but not check
393
                    }
394
395
                }
396
                if ($find) {
397
                    break;
398
                }
399
            }
400
            if ($find) {
401
                break;
402
            }
403
        }
404
        if (!$find) {
405
            $find = $nodes[0];
406
        }
407
        return $this->client($find);
408
    }
409
    /**
410
     * @return Client
411
     */
412
    public function activeClient()
413
    {
414
        return $this->client($this->nodes[0]);
415
    }
416
417
    /**
418
     * @paramstring $cluster
419
     * @return int
420
     * @throws Exception\TransportException
421
     */
422
    public function getClusterCountShard($cluster)
423
    {
424
        $table = $this->getClusterInfoTable($cluster);
425
        $c = [];
426
        foreach ($table as $row) {
427
            $c[$row['shard_num']] = 1;
428
        }
429
        return sizeof($c);
430
    }
431
432
    /**
433
     * @paramstring $cluster
434
     * @return int
435
     * @throws Exception\TransportException
436
     */
437
    public function getClusterCountReplica($cluster)
438
    {
439
        $table = $this->getClusterInfoTable($cluster);
440
        $c = [];
441
        foreach ($table as $row) {
442
            $c[$row['replica_num']] = 1;
443
        }
444
        return sizeof($c);
445
    }
446
447
    /**
448
     * @paramstring $cluster
449
     * @return mixed
450
     * @throws Exception\TransportException
451
     */
452
    public function getClusterInfoTable($cluster)
453
    {
454
        $this->connect();
455
        if (empty($this->resultScan['cluster.list'][$cluster])) {
456
            throw new QueryException('Cluster not find:' . $cluster);
457
        }
458
        return $this->resultScan['cluster.list'][$cluster];
459
    }
460
461
    /**
462
     * @paramstring $cluster
463
     * @return array
464
     * @throws Exception\TransportException
465
     */
466
    public function getClusterNodes($cluster)
467
    {
468
        return array_keys($this->getClusterInfoTable($cluster));
469
    }
470
471
    /**
472
     * @return array
473
     * @throws Exception\TransportException
474
     */
475
    public function getClusterList()
476
    {
477
        $this->connect();
478
        return array_keys($this->resultScan['cluster.list']);
479
    }
480
481
    /**
482
     * list all tables on all nodes
483
     *
484
     * @return array
485
     * @throws Exception\TransportException
486
     */
487
    public function getTables($resultDetail = false)
488
    {
489
        $this->connect();
490
        $list = [];
491
        foreach ($this->tables as $db_name=>$tables)
492
        {
493
            foreach ($tables as $table_name=>$nodes)
494
            {
495
496
                if ($resultDetail)
497
                {
498
                    $list[$db_name . '.' . $table_name] = $nodes;
499
                } else
500
                {
501
                    $list[$db_name . '.' . $table_name] = array_keys($nodes);
502
                }
503
            }
504
        }
505
        return $list;
506
    }
507
508
    /**
509
     * Table size on cluster
510
     *
511
     * @param string $database_table
512
     * @return array|null
513
     *
514
     * @throws Exception\TransportException
515
     */
516
    public function getSizeTable($database_table)
517
    {
518
        $nodes = $this->getNodesByTable($database_table);
519
        // scan need node`s
520
        foreach ($nodes as $node)
521
        {
522
            if (empty($this->_table_size_cache[$node]))
523
            {
524
                $this->_table_size_cache[$node] = $this->client($node)->tablesSize(true);
525
            }
526
        }
527
528
        $sizes = [];
529
        foreach ($this->_table_size_cache as $node=>$rows)
530
        {
531
            foreach ($rows as $row)
532
            {
533
                $sizes[$row['database'] . '.' . $row['table']][$node] = $row;
534
                @$sizes[$row['database'] . '.' . $row['table']]['total']['sizebytes'] += $row['sizebytes'];
535
536
537
538
            }
539
        }
540
541
        if (empty($sizes[$database_table]))
542
        {
543
            return null;
544
        }
545
        return $sizes[$database_table]['total']['sizebytes'];
546
    }
547
548
549
    /**
550
     * Truncate on all nodes
551
     * @deprecated
552
     * @param string $database_table
553
     * @return array
554
     * @throws Exception\TransportException
555
     */
556
    public function truncateTable($database_table, $timeOut = 2000)
557
    {
558
        $out = [];
559
        list($db, $table) = explode('.', $database_table);
560
        $nodes = $this->getMasterNodeForTable($database_table);
561
        // scan need node`s
562
        foreach ($nodes as $node)
563
        {
564
            $def = $this->client($node)->getTimeout();
565
            $this->client($node)->database($db)->setTimeout($timeOut);
566
            $out[$node] = $this->client($node)->truncateTable($table);
567
            $this->client($node)->setTimeout($def);
568
        }
569
        return $out;
570
    }
571
572
    /**
573
     * is_leader node
574
     *
575
     * @param string $database_table
576
     * @return array
577
     * @throws Exception\TransportException
578
     */
579
    public function getMasterNodeForTable($database_table)
580
    {
581
        $list = $this->getTables(true);
582
583
        if (empty($list[$database_table])) {
584
            return [];
585
        }
586
587
588
        $result = [];
589
        foreach ($list[$database_table] as $node=>$row)
590
        {
591
            if ($row['is_leader']) {
592
                $result[] = $node;
593
            }
594
        }
595
        return $result;
596
    }
597
    /**
598
     * Find nodes by : db_name.table_name
599
     *
600
     * @param string $database_table
601
     * @return array
602
     * @throws Exception\TransportException
603
     */
604
    public function getNodesByTable($database_table)
605
    {
606
        $list = $this->getTables();
607
        if (empty($list[$database_table])) {
608
            throw new QueryException('Not find :' . $database_table);
609
        }
610
        return $list[$database_table];
611
    }
612
613
    /**
614
     * Error string
615
     *
616
     * @return string|bool
617
     */
618
    public function getError()
619
    {
620
        if (is_array($this->error)) {
621
            return json_encode($this->error);
622
        }
623
        return $this->error;
624
    }
625
626
}
627