Passed
Push — master ( cc4828...00e84f )
by Igor
10:05
created

src/Cluster.php (1 issue)

1
<?php
2
namespace ClickHouseDB;
3
4
use ClickHouseDB\Exception\QueryException;
5
6
class Cluster
7
{
8
    /**
9
     * @var array
10
     */
11
    private $nodes = [];
12
13
14
    /**
15
     * @var Client[]
16
     */
17
    private $clients = [];
18
19
    /**
20
     * @var Client
21
     */
22
    private $defaultClient;
23
24
    /**
25
     * @var array
26
     */
27
    private $badNodes = [];
28
29
    /**
30
     * @var array|bool
31
     */
32
    private $error = [];
33
    /**
34
     * @var array
35
     */
36
    private $resultScan = [];
37
    /**
38
     * @var string
39
     */
40
    private $defaultHostName;
41
42
    /**
43
     * @var int|float
44
     */
45
    private $scanTimeOut = 10;
46
47
    /**
48
     * @var array
49
     */
50
    private $tables = [];
51
52
    /**
53
     * @var array
54
     */
55
    private $hostsnames = [];
56
    /**
57
     * @var bool
58
     */
59
    private $isScaned = false;
60
61
62
    /**
63
     * A symptom of straining CH when checking a cluster request in Zookiper
64
     * false - send a request to ZK, do not do SELECT * FROM system.replicas
65
     *
66
     * @var bool
67
     */
68
    private $softCheck = true;
69
70
    /**
71
     * @var bool
72
     */
73
    private $replicasIsOk;
74
75
    /**
76
     * Cache
77
     *
78
     * @var array
79
     */
80
    private $_table_size_cache = [];
81
82
    /**
83
     * Cluster constructor.
84
     *
85
     * @param array $connect_params
86
     * @param array $settings
87
     */
88
    public function __construct($connect_params, $settings = [])
89
    {
90
        $this->defaultClient = new Client($connect_params, $settings);
91
        $this->defaultHostName = $this->defaultClient->getConnectHost();
92
        $this->setNodes(gethostbynamel($this->defaultHostName));
93
    }
94
95
    /**
96
     * @return Client
97
     */
98
    private function defaultClient()
99
    {
100
        return $this->defaultClient;
101
    }
102
103
    /**
104
     * @param bool $softCheck
105
     */
106
    public function setSoftCheck($softCheck)
107
    {
108
        $this->softCheck = $softCheck;
109
    }
110
111
    /**
112
     * @param float|integer $scanTimeOut
113
     */
114
    public function setScanTimeOut($scanTimeOut)
115
    {
116
        $this->scanTimeOut = $scanTimeOut;
117
    }
118
119
    /**
120
     * @param array $nodes
121
     */
122
    public function setNodes($nodes)
123
    {
124
        $this->nodes = $nodes;
125
    }
126
127
    /**
128
     * @return array
129
     */
130
    public function getNodes()
131
    {
132
        return $this->nodes;
133
    }
134
135
    /**
136
     * @return array
137
     */
138
    public function getBadNodes()
139
    {
140
        return $this->badNodes;
141
    }
142
143
144
    /**
145
     * Connect all nodes and scan
146
     *
147
     * @return $this
148
     * @throws Exception\TransportException
149
     */
150
    public function connect()
151
    {
152
        if (!$this->isScaned) {
153
            $this->rescan();
154
        }
155
        return $this;
156
    }
157
158
    /**
159
     * Check the status of the cluster, the request is taken from the documentation for CH
160
     * total_replicas <2 - not suitable for no replication clusters
161
     *
162
     *
163
     * @param mixed $replicas
164
     * @return bool
165
     */
166
    private function isReplicasWork($replicas)
167
    {
168
        $ok = true;
169
        if (!is_array($replicas)) {
170
            // @todo нет массива ошибка, т/к мы работем с репликами?
171
            // @todo Как быть есть в кластере НЕТ реплик ?
172
            return false;
173
        }
174
        foreach ($replicas as $replica) {
175
            if ($replica['is_readonly']) {
176
                $ok = false;
177
                $this->error[] = 'is_readonly : ' . json_encode($replica);
178
            }
179
            if ($replica['is_session_expired']) {
180
                $ok = false;
181
                $this->error[] = 'is_session_expired : ' . json_encode($replica);
182
            }
183
            if ($replica['future_parts'] > 20) {
184
                $ok = false;
185
                $this->error[] = 'future_parts : ' . json_encode($replica);
186
            }
187
            if ($replica['parts_to_check'] > 10) {
188
                $ok = false;
189
                $this->error[] = 'parts_to_check : ' . json_encode($replica);
190
            }
191
192
            // @todo : rewrite total_replicas=1 если кластер без реплики , нужно проверять какой класте и сколько в нем реплик
193
//            if ($replica['total_replicas']<2) {$ok=false;$this->error[]='total_replicas : '.json_encode($replica);}
194
            if ($this->softCheck)
195
            {
196
                if (!$ok) {
197
                    break;
198
                }
199
                continue;
200
            }
201
202
            if ($replica['active_replicas'] < $replica['total_replicas']) {
203
                $ok = false;
204
                $this->error[] = 'active_replicas : ' . json_encode($replica);
205
            }
206
            if ($replica['queue_size'] > 20) {
207
                $ok = false;
208
                $this->error[] = 'queue_size : ' . json_encode($replica);
209
            }
210
            if (($replica['log_max_index'] - $replica['log_pointer']) > 10) {
211
                $ok = false;
212
                $this->error[] = 'log_max_index : ' . json_encode($replica);
213
            }
214
            if (!$ok) {
215
                break;
216
            }
217
        }
218
        return $ok;
219
    }
220
221
    private function getSelectSystemReplicas()
222
    {
223
        // If you query all the columns, then the table may work slightly slow, since there are several readings from ZK per line.
224
        // If you do not query the last 4 columns (log_max_index, log_pointer, total_replicas, active_replicas), then the table works quickly.        if ($this->softCheck)
225
226
            return 'SELECT 
227
            database,table,engine,is_leader,is_readonly,
228
            is_session_expired,future_parts,parts_to_check,zookeeper_path,replica_name,replica_path,columns_version,
229
            queue_size,inserts_in_queue,merges_in_queue,queue_oldest_time,inserts_oldest_time,merges_oldest_time			
230
            FROM system.replicas
231
        ';
232
        //        return 'SELECT * FROM system.replicas';
233
    }
234
235
    /**
236
     * @return $this
237
     * @throws Exception\TransportException
238
     */
239
    public function rescan()
240
    {
241
        $this->error = [];
242
        /*
243
         * 1) Get the IP list
244
         * 2) To each connect via IP, through activeClient replacing host on ip
245
         * 3) We get information system.clusters + system.replicas from each machine, overwrite {DnsCache + timeOuts}
246
         * 4) Determine the necessary machines for the cluster / replica
247
         * 5) .... ?
248
         */
249
        $statementsReplicas = [];
250
        $statementsClusters = [];
251
        $result = [];
252
253
        $badNodes = [];
254
        $replicasIsOk = true;
255
256
        foreach ($this->nodes as $node) {
257
            $this->defaultClient()->setHost($node);
258
259
260
261
262
            $statementsReplicas[$node] = $this->defaultClient()->selectAsync($this->getSelectSystemReplicas());
263
            $statementsClusters[$node] = $this->defaultClient()->selectAsync('SELECT * FROM system.clusters');
264
            // пересетапим timeout
265
            $statementsReplicas[$node]->getRequest()->setDnsCache(0)->timeOut($this->scanTimeOut)->connectTimeOut($this->scanTimeOut);
266
            $statementsClusters[$node]->getRequest()->setDnsCache(0)->timeOut($this->scanTimeOut)->connectTimeOut($this->scanTimeOut);
267
        }
268
        $this->defaultClient()->executeAsync();
269
        $tables = [];
270
271
        foreach ($this->nodes as $node) {
272
273
274
            try {
275
                $r = $statementsReplicas[$node]->rows();
276
                foreach ($r as $row) {
277
                    $tables[$row['database']][$row['table']][$node] = $row;
278
                }
279
                $result['replicas'][$node] = $r;
280
            }catch (\Exception $E) {
281
                $result['replicas'][$node] = false;
282
                $badNodes[$node] = $E->getMessage();
283
                $this->error[] = 'statementsReplicas:' . $E->getMessage();
284
            }
285
            // ---------------------------------------------------------------------------------------------------
286
            $hosts = [];
287
288
            try {
289
                $c = $statementsClusters[$node]->rows();
290
                $result['clusters'][$node] = $c;
291
                foreach ($c as $row) {
292
                    $hosts[$row['host_address']][$row['port']] = $row['host_name'];
293
                    $result['cluster.list'][$row['cluster']][$row['host_address']] =
294
                        [
295
                            'shard_weight' => $row['shard_weight'],
296
                            'replica_num' => $row['replica_num'],
297
                            'shard_num' => $row['shard_num'],
298
                            'is_local' => $row['is_local']
299
                        ];
300
                }
301
302
            }catch (\Exception $E) {
303
                $result['clusters'][$node] = false;
304
305
                $this->error[] = 'clusters:' . $E->getMessage();
306
                $badNodes[$node] = $E->getMessage();
307
308
            }
309
            $this->hostsnames = $hosts;
310
            $this->tables = $tables;
311
            // ---------------------------------------------------------------------------------------------------
312
            // Let's check that replication goes well
313
            $rIsOk = $this->isReplicasWork($result['replicas'][$node]);
314
            $result['replicasIsOk'][$node] = $rIsOk;
315
            if (!$rIsOk) {
316
                $replicasIsOk = false;
317
            }
318
            // ---------------------------------------------------------------------------------------------------
319
        }
320
321
        // badNodes = array(6) {  '222.222.222.44' =>  string(13) "HttpCode:0 ; " , '222.222.222.11' =>  string(13) "HttpCode:0 ; "
322
        $this->badNodes = $badNodes;
323
324
        // Restore DNS host name on ch_client
325
        $this->defaultClient()->setHost($this->defaultHostName);
326
327
328
        $this->isScaned = true;
329
        $this->replicasIsOk = $replicasIsOk;
330
        $this->error[] = "Bad replicasIsOk, in " . json_encode($result['replicasIsOk']);
331
        // ------------------------------------------------
332
        // @todo : To specify on fighting falls and at different-sided configurations ...
333
        if (sizeof($this->badNodes)) {
334
            $this->error[] = 'Have bad node : ' . json_encode($this->badNodes);
335
            $this->replicasIsOk = false;
336
        }
337
        if (!sizeof($this->error)) {
338
            $this->error = false;
339
        }
340
        $this->resultScan = $result;
341
        // @todo  : We connect to everyone in the DNS list, we need to decry that the requests were returned by all the hosts to which we connected
342
        return $this;
343
    }
344
345
    /**
346
     * @return boolean
347
     * @throws Exception\TransportException
348
     */
349
    public function isReplicasIsOk()
350
    {
351
        return $this->connect()->replicasIsOk;
352
    }
353
354
    /**
355
     * @param  string $node
356
     * @return Client
357
     */
358
    public function client($node)
359
    {
360
        // Создаем клиенты под каждый IP
361
        if (empty($this->clients[$node])) {
362
            $this->clients[$node] = clone $this->defaultClient();
363
        }
364
365
        $this->clients[$node]->setHost($node);
366
367
        return $this->clients[$node];
368
    }
369
370
    /**
371
     * @return Client
372
     * @throws Exception\TransportException
373
     */
374
    public function clientLike($cluster, $ip_addr_like)
375
    {
376
        $nodes_check = $this->nodes;
377
        $nodes = $this->getClusterNodes($cluster);
378
        $list_ips_need = explode(';', $ip_addr_like);
379
        $find = false;
380
        foreach ($list_ips_need as $like)
381
        {
382
            foreach ($nodes as $node)
383
            {
384
385
                if (stripos($node, $like) !== false)
386
                {
387
                    if (in_array($node, $nodes_check))
388
                    {
389
                        $find = $node;
390
                    } else
391
                    {
392
                        // node exists on cluster, but not check
393
                    }
394
395
                }
396
                if ($find) {
397
                    break;
398
                }
399
            }
400
            if ($find) {
401
                break;
402
            }
403
        }
404
        if (!$find) {
405
            $find = $nodes[0];
406
        }
407
        return $this->client($find);
408
    }
409
    /**
410
     * @return Client
411
     */
412
    public function activeClient()
413
    {
414
        return $this->client($this->nodes[0]);
415
    }
416
417
    /**
418
     * @paramstring $cluster
419
     * @return int
420
     * @throws Exception\TransportException
421
     */
422
    public function getClusterCountShard($cluster)
423
    {
424
        $table = $this->getClusterInfoTable($cluster);
425
        $c = [];
426
        foreach ($table as $row) {
427
            $c[$row['shard_num']] = 1;
428
        }
429
        return sizeof($c);
430
    }
431
432
    /**
433
     * @paramstring $cluster
434
     * @return int
435
     * @throws Exception\TransportException
436
     */
437
    public function getClusterCountReplica($cluster)
438
    {
439
        $table = $this->getClusterInfoTable($cluster);
440
        $c = [];
441
        foreach ($table as $row) {
442
            $c[$row['replica_num']] = 1;
443
        }
444
        return sizeof($c);
445
    }
446
447
    /**
448
     * @paramstring $cluster
449
     * @return mixed
450
     * @throws Exception\TransportException
451
     */
452
    public function getClusterInfoTable($cluster)
453
    {
454
        $this->connect();
455
        if (empty($this->resultScan['cluster.list'][$cluster])) {
456
            throw new QueryException('Cluster not find:' . $cluster);
457
        }
458
        return $this->resultScan['cluster.list'][$cluster];
459
    }
460
461
    /**
462
     * @paramstring $cluster
463
     * @return array
464
     * @throws Exception\TransportException
465
     */
466
    public function getClusterNodes($cluster)
467
    {
468
        return array_keys($this->getClusterInfoTable($cluster));
469
    }
470
471
    /**
472
     * @return array
473
     * @throws Exception\TransportException
474
     */
475
    public function getClusterList()
476
    {
477
        $this->connect();
478
        return array_keys($this->resultScan['cluster.list']);
479
    }
480
481
    /**
482
     * list all tables on all nodes
483
     *
484
     * @return array
485
     * @throws Exception\TransportException
486
     */
487
    public function getTables($resultDetail = false)
488
    {
489
        $this->connect();
490
        $list = [];
491
        foreach ($this->tables as $db_name=>$tables)
492
        {
493
            foreach ($tables as $table_name=>$nodes)
494
            {
495
496
                if ($resultDetail)
497
                {
498
                    $list[$db_name . '.' . $table_name] = $nodes;
499
                } else
500
                {
501
                    $list[$db_name . '.' . $table_name] = array_keys($nodes);
502
                }
503
            }
504
        }
505
        return $list;
506
    }
507
508
    /**
509
     * Table size on cluster
510
     *
511
     * @param string $database_table
512
     * @return array|null
513
     *
514
     * @throws Exception\TransportException
515
     */
516
    public function getSizeTable($database_table)
517
    {
518
        $nodes = $this->getNodesByTable($database_table);
519
        // scan need node`s
520
        foreach ($nodes as $node)
521
        {
522
            if (empty($this->_table_size_cache[$node]))
523
            {
524
                $this->_table_size_cache[$node] = $this->client($node)->tablesSize(true);
525
            }
526
        }
527
528
        $sizes = [];
529
        foreach ($this->_table_size_cache as $node=>$rows)
530
        {
531
            foreach ($rows as $row)
532
            {
533
                $sizes[$row['database'] . '.' . $row['table']][$node] = $row;
534
                @$sizes[$row['database'] . '.' . $row['table']]['total']['sizebytes'] += $row['sizebytes'];
535
536
537
538
            }
539
        }
540
541
        if (empty($sizes[$database_table]))
542
        {
543
            return null;
544
        }
545
        return $sizes[$database_table]['total']['sizebytes'];
546
    }
547
548
549
    /**
550
     * Truncate on all nodes
551
     * @deprecated
552
     * @param string $database_table
553
     * @return array
554
     * @throws Exception\TransportException
555
     */
556
    public function truncateTable($database_table, $timeOut = 2000)
557
    {
558
        $out = [];
559
        list($db, $table) = explode('.', $database_table);
560
        $nodes = $this->getMasterNodeForTable($database_table);
561
        // scan need node`s
562
        foreach ($nodes as $node)
563
        {
564
            $def = $this->client($node)->getTimeout();
565
            $this->client($node)->database($db)->setTimeout($timeOut);
566
            $out[$node] = $this->client($node)->truncateTable($table);
567
            $this->client($node)->setTimeout($def);
568
        }
569
        return $out;
570
    }
571
572
    /**
573
     * is_leader node
574
     *
575
     * @param string $database_table
576
     * @return array
577
     * @throws Exception\TransportException
578
     */
579
    public function getMasterNodeForTable($database_table)
580
    {
581
        $list = $this->getTables(true);
582
583
        if (empty($list[$database_table])) {
584
            return [];
585
        }
586
587
588
        $result = [];
589
        foreach ($list[$database_table] as $node=>$row)
590
        {
591
            if ($row['is_leader']) {
592
                $result[] = $node;
593
            }
594
        }
595
        return $result;
596
    }
597
    /**
598
     * Find nodes by : db_name.table_name
599
     *
600
     * @param string $database_table
601
     * @return array
602
     * @throws Exception\TransportException
603
     */
604
    public function getNodesByTable($database_table)
605
    {
606
        $list = $this->getTables();
607
        if (empty($list[$database_table])) {
608
            throw new QueryException('Not find :' . $database_table);
609
        }
610
        return $list[$database_table];
611
    }
612
613
    /**
614
     * Error string
615
     *
616
     * @return string|bool
617
     */
618
    public function getError()
619
    {
620
        if (is_array($this->error)) {
621
            return json_encode($this->error);
622
        }
623
        return $this->error;
624
    }
625
626
}
0 ignored issues
show
There must be exactly 0 empty lines before class closing brace.
Loading history...
627