1 | <?php |
||
2 | namespace ClickHouseDB; |
||
3 | |||
4 | use ClickHouseDB\Exception\QueryException; |
||
5 | |||
6 | class Cluster |
||
7 | { |
||
8 | /** |
||
9 | * @var array |
||
10 | */ |
||
11 | private $nodes = []; |
||
12 | |||
13 | |||
14 | /** |
||
15 | * @var Client[] |
||
16 | */ |
||
17 | private $clients = []; |
||
18 | |||
19 | /** |
||
20 | * @var Client |
||
21 | */ |
||
22 | private $defaultClient; |
||
23 | |||
24 | /** |
||
25 | * @var array |
||
26 | */ |
||
27 | private $badNodes = []; |
||
28 | |||
29 | /** |
||
30 | * @var array|bool |
||
31 | */ |
||
32 | private $error = []; |
||
33 | /** |
||
34 | * @var array |
||
35 | */ |
||
36 | private $resultScan = []; |
||
37 | /** |
||
38 | * @var string |
||
39 | */ |
||
40 | private $defaultHostName; |
||
41 | |||
42 | /** |
||
43 | * @var int|float |
||
44 | */ |
||
45 | private $scanTimeOut = 10; |
||
46 | |||
47 | /** |
||
48 | * @var array |
||
49 | */ |
||
50 | private $tables = []; |
||
51 | |||
52 | /** |
||
53 | * @var array |
||
54 | */ |
||
55 | private $hostsnames = []; |
||
56 | /** |
||
57 | * @var bool |
||
58 | */ |
||
59 | private $isScaned = false; |
||
60 | |||
61 | |||
62 | /** |
||
63 | * A symptom of straining CH when checking a cluster request in Zookiper |
||
64 | * false - send a request to ZK, do not do SELECT * FROM system.replicas |
||
65 | * |
||
66 | * @var bool |
||
67 | */ |
||
68 | private $softCheck = true; |
||
69 | |||
70 | /** |
||
71 | * @var bool |
||
72 | */ |
||
73 | private $replicasIsOk; |
||
74 | |||
75 | /** |
||
76 | * Cache |
||
77 | * |
||
78 | * @var array |
||
79 | */ |
||
80 | private $_table_size_cache = []; |
||
81 | |||
82 | /** |
||
83 | * Cluster constructor. |
||
84 | * |
||
85 | * @param array $connect_params |
||
86 | * @param array $settings |
||
87 | */ |
||
88 | public function __construct($connect_params, $settings = []) |
||
89 | { |
||
90 | $this->defaultClient = new Client($connect_params, $settings); |
||
91 | $this->defaultHostName = $this->defaultClient->getConnectHost(); |
||
92 | $this->setNodes(gethostbynamel($this->defaultHostName)); |
||
93 | } |
||
94 | |||
95 | /** |
||
96 | * @return Client |
||
97 | */ |
||
98 | private function defaultClient() |
||
99 | { |
||
100 | return $this->defaultClient; |
||
101 | } |
||
102 | |||
103 | /** |
||
104 | * @param bool $softCheck |
||
105 | */ |
||
106 | public function setSoftCheck($softCheck) |
||
107 | { |
||
108 | $this->softCheck = $softCheck; |
||
109 | } |
||
110 | |||
111 | /** |
||
112 | * @param float|integer $scanTimeOut |
||
113 | */ |
||
114 | public function setScanTimeOut($scanTimeOut) |
||
115 | { |
||
116 | $this->scanTimeOut = $scanTimeOut; |
||
117 | } |
||
118 | |||
119 | /** |
||
120 | * @param array $nodes |
||
121 | */ |
||
122 | public function setNodes($nodes) |
||
123 | { |
||
124 | $this->nodes = $nodes; |
||
125 | } |
||
126 | |||
127 | /** |
||
128 | * @return array |
||
129 | */ |
||
130 | public function getNodes() |
||
131 | { |
||
132 | return $this->nodes; |
||
133 | } |
||
134 | |||
135 | /** |
||
136 | * @return array |
||
137 | */ |
||
138 | public function getBadNodes() |
||
139 | { |
||
140 | return $this->badNodes; |
||
141 | } |
||
142 | |||
143 | |||
144 | /** |
||
145 | * Connect all nodes and scan |
||
146 | * |
||
147 | * @return $this |
||
148 | * @throws Exception\TransportException |
||
149 | */ |
||
150 | public function connect() |
||
151 | { |
||
152 | if (!$this->isScaned) { |
||
153 | $this->rescan(); |
||
154 | } |
||
155 | return $this; |
||
156 | } |
||
157 | |||
158 | /** |
||
159 | * Check the status of the cluster, the request is taken from the documentation for CH |
||
160 | * total_replicas <2 - not suitable for no replication clusters |
||
161 | * |
||
162 | * |
||
163 | * @param mixed $replicas |
||
164 | * @return bool |
||
165 | */ |
||
166 | private function isReplicasWork($replicas) |
||
167 | { |
||
168 | $ok = true; |
||
169 | if (!is_array($replicas)) { |
||
170 | // @todo нет массива ошибка, т/к мы работем с репликами? |
||
171 | // @todo Как быть есть в кластере НЕТ реплик ? |
||
172 | return false; |
||
173 | } |
||
174 | foreach ($replicas as $replica) { |
||
175 | if ($replica['is_readonly']) { |
||
176 | $ok = false; |
||
177 | $this->error[] = 'is_readonly : ' . json_encode($replica); |
||
178 | } |
||
179 | if ($replica['is_session_expired']) { |
||
180 | $ok = false; |
||
181 | $this->error[] = 'is_session_expired : ' . json_encode($replica); |
||
182 | } |
||
183 | if ($replica['future_parts'] > 20) { |
||
184 | $ok = false; |
||
185 | $this->error[] = 'future_parts : ' . json_encode($replica); |
||
186 | } |
||
187 | if ($replica['parts_to_check'] > 10) { |
||
188 | $ok = false; |
||
189 | $this->error[] = 'parts_to_check : ' . json_encode($replica); |
||
190 | } |
||
191 | |||
192 | // @todo : rewrite total_replicas=1 если кластер без реплики , нужно проверять какой класте и сколько в нем реплик |
||
193 | // if ($replica['total_replicas']<2) {$ok=false;$this->error[]='total_replicas : '.json_encode($replica);} |
||
194 | if ($this->softCheck) |
||
195 | { |
||
196 | if (!$ok) { |
||
197 | break; |
||
198 | } |
||
199 | continue; |
||
200 | } |
||
201 | |||
202 | if ($replica['active_replicas'] < $replica['total_replicas']) { |
||
203 | $ok = false; |
||
204 | $this->error[] = 'active_replicas : ' . json_encode($replica); |
||
205 | } |
||
206 | if ($replica['queue_size'] > 20) { |
||
207 | $ok = false; |
||
208 | $this->error[] = 'queue_size : ' . json_encode($replica); |
||
209 | } |
||
210 | if (($replica['log_max_index'] - $replica['log_pointer']) > 10) { |
||
211 | $ok = false; |
||
212 | $this->error[] = 'log_max_index : ' . json_encode($replica); |
||
213 | } |
||
214 | if (!$ok) { |
||
215 | break; |
||
216 | } |
||
217 | } |
||
218 | return $ok; |
||
219 | } |
||
220 | |||
221 | private function getSelectSystemReplicas() |
||
222 | { |
||
223 | // If you query all the columns, then the table may work slightly slow, since there are several readings from ZK per line. |
||
224 | // If you do not query the last 4 columns (log_max_index, log_pointer, total_replicas, active_replicas), then the table works quickly. if ($this->softCheck) |
||
225 | |||
226 | return 'SELECT |
||
227 | database,table,engine,is_leader,is_readonly, |
||
228 | is_session_expired,future_parts,parts_to_check,zookeeper_path,replica_name,replica_path,columns_version, |
||
229 | queue_size,inserts_in_queue,merges_in_queue,queue_oldest_time,inserts_oldest_time,merges_oldest_time |
||
230 | FROM system.replicas |
||
231 | '; |
||
232 | // return 'SELECT * FROM system.replicas'; |
||
233 | } |
||
234 | |||
235 | /** |
||
236 | * @return $this |
||
237 | * @throws Exception\TransportException |
||
238 | */ |
||
239 | public function rescan() |
||
240 | { |
||
241 | $this->error = []; |
||
242 | /* |
||
243 | * 1) Get the IP list |
||
244 | * 2) To each connect via IP, through activeClient replacing host on ip |
||
245 | * 3) We get information system.clusters + system.replicas from each machine, overwrite {DnsCache + timeOuts} |
||
246 | * 4) Determine the necessary machines for the cluster / replica |
||
247 | * 5) .... ? |
||
248 | */ |
||
249 | $statementsReplicas = []; |
||
250 | $statementsClusters = []; |
||
251 | $result = []; |
||
252 | |||
253 | $badNodes = []; |
||
254 | $replicasIsOk = true; |
||
255 | |||
256 | foreach ($this->nodes as $node) { |
||
257 | $this->defaultClient()->setHost($node); |
||
258 | |||
259 | |||
260 | |||
261 | |||
262 | $statementsReplicas[$node] = $this->defaultClient()->selectAsync($this->getSelectSystemReplicas()); |
||
263 | $statementsClusters[$node] = $this->defaultClient()->selectAsync('SELECT * FROM system.clusters'); |
||
264 | // пересетапим timeout |
||
265 | $statementsReplicas[$node]->getRequest()->setDnsCache(0)->timeOut($this->scanTimeOut)->connectTimeOut($this->scanTimeOut); |
||
266 | $statementsClusters[$node]->getRequest()->setDnsCache(0)->timeOut($this->scanTimeOut)->connectTimeOut($this->scanTimeOut); |
||
267 | } |
||
268 | $this->defaultClient()->executeAsync(); |
||
269 | $tables = []; |
||
270 | |||
271 | foreach ($this->nodes as $node) { |
||
272 | |||
273 | |||
274 | try { |
||
275 | $r = $statementsReplicas[$node]->rows(); |
||
276 | foreach ($r as $row) { |
||
277 | $tables[$row['database']][$row['table']][$node] = $row; |
||
278 | } |
||
279 | $result['replicas'][$node] = $r; |
||
280 | }catch (\Exception $E) { |
||
281 | $result['replicas'][$node] = false; |
||
282 | $badNodes[$node] = $E->getMessage(); |
||
283 | $this->error[] = 'statementsReplicas:' . $E->getMessage(); |
||
284 | } |
||
285 | // --------------------------------------------------------------------------------------------------- |
||
286 | $hosts = []; |
||
287 | |||
288 | try { |
||
289 | $c = $statementsClusters[$node]->rows(); |
||
290 | $result['clusters'][$node] = $c; |
||
291 | foreach ($c as $row) { |
||
292 | $hosts[$row['host_address']][$row['port']] = $row['host_name']; |
||
293 | $result['cluster.list'][$row['cluster']][$row['host_address']] = |
||
294 | [ |
||
295 | 'shard_weight' => $row['shard_weight'], |
||
296 | 'replica_num' => $row['replica_num'], |
||
297 | 'shard_num' => $row['shard_num'], |
||
298 | 'is_local' => $row['is_local'] |
||
299 | ]; |
||
300 | } |
||
301 | |||
302 | }catch (\Exception $E) { |
||
303 | $result['clusters'][$node] = false; |
||
304 | |||
305 | $this->error[] = 'clusters:' . $E->getMessage(); |
||
306 | $badNodes[$node] = $E->getMessage(); |
||
307 | |||
308 | } |
||
309 | $this->hostsnames = $hosts; |
||
310 | $this->tables = $tables; |
||
311 | // --------------------------------------------------------------------------------------------------- |
||
312 | // Let's check that replication goes well |
||
313 | $rIsOk = $this->isReplicasWork($result['replicas'][$node]); |
||
314 | $result['replicasIsOk'][$node] = $rIsOk; |
||
315 | if (!$rIsOk) { |
||
316 | $replicasIsOk = false; |
||
317 | } |
||
318 | // --------------------------------------------------------------------------------------------------- |
||
319 | } |
||
320 | |||
321 | // badNodes = array(6) { '222.222.222.44' => string(13) "HttpCode:0 ; " , '222.222.222.11' => string(13) "HttpCode:0 ; " |
||
322 | $this->badNodes = $badNodes; |
||
323 | |||
324 | // Restore DNS host name on ch_client |
||
325 | $this->defaultClient()->setHost($this->defaultHostName); |
||
326 | |||
327 | |||
328 | $this->isScaned = true; |
||
329 | $this->replicasIsOk = $replicasIsOk; |
||
330 | $this->error[] = "Bad replicasIsOk, in " . json_encode($result['replicasIsOk']); |
||
0 ignored issues
–
show
|
|||
331 | // ------------------------------------------------ |
||
332 | // @todo : To specify on fighting falls and at different-sided configurations ... |
||
333 | if (sizeof($this->badNodes)) { |
||
334 | $this->error[] = 'Have bad node : ' . json_encode($this->badNodes); |
||
335 | $this->replicasIsOk = false; |
||
336 | } |
||
337 | if (!sizeof($this->error)) { |
||
338 | $this->error = false; |
||
339 | } |
||
340 | $this->resultScan = $result; |
||
341 | // @todo : We connect to everyone in the DNS list, we need to decry that the requests were returned by all the hosts to which we connected |
||
342 | return $this; |
||
343 | } |
||
344 | |||
345 | /** |
||
346 | * @return boolean |
||
347 | * @throws Exception\TransportException |
||
348 | */ |
||
349 | public function isReplicasIsOk() |
||
350 | { |
||
351 | return $this->connect()->replicasIsOk; |
||
352 | } |
||
353 | |||
354 | /** |
||
355 | * @param string $node |
||
356 | * @return Client |
||
357 | */ |
||
358 | public function client($node) |
||
359 | { |
||
360 | // Создаем клиенты под каждый IP |
||
361 | if (empty($this->clients[$node])) { |
||
362 | $this->clients[$node] = clone $this->defaultClient(); |
||
363 | } |
||
364 | |||
365 | $this->clients[$node]->setHost($node); |
||
366 | |||
367 | return $this->clients[$node]; |
||
368 | } |
||
369 | |||
370 | /** |
||
371 | * @return Client |
||
372 | * @throws Exception\TransportException |
||
373 | */ |
||
374 | public function clientLike($cluster, $ip_addr_like) |
||
375 | { |
||
376 | $nodes_check = $this->nodes; |
||
377 | $nodes = $this->getClusterNodes($cluster); |
||
378 | $list_ips_need = explode(';', $ip_addr_like); |
||
379 | $find = false; |
||
380 | foreach ($list_ips_need as $like) |
||
381 | { |
||
382 | foreach ($nodes as $node) |
||
383 | { |
||
384 | |||
385 | if (stripos($node, $like) !== false) |
||
386 | { |
||
387 | if (in_array($node, $nodes_check)) |
||
388 | { |
||
389 | $find = $node; |
||
390 | } else |
||
391 | { |
||
392 | // node exists on cluster, but not check |
||
393 | } |
||
394 | |||
395 | } |
||
396 | if ($find) { |
||
397 | break; |
||
398 | } |
||
399 | } |
||
400 | if ($find) { |
||
401 | break; |
||
402 | } |
||
403 | } |
||
404 | if (!$find) { |
||
405 | $find = $nodes[0]; |
||
406 | } |
||
407 | return $this->client($find); |
||
408 | } |
||
409 | /** |
||
410 | * @return Client |
||
411 | */ |
||
412 | public function activeClient() |
||
413 | { |
||
414 | return $this->client($this->nodes[0]); |
||
415 | } |
||
416 | |||
417 | /** |
||
418 | * @paramstring $cluster |
||
419 | * @return int |
||
420 | * @throws Exception\TransportException |
||
421 | */ |
||
422 | public function getClusterCountShard($cluster) |
||
423 | { |
||
424 | $table = $this->getClusterInfoTable($cluster); |
||
425 | $c = []; |
||
426 | foreach ($table as $row) { |
||
427 | $c[$row['shard_num']] = 1; |
||
428 | } |
||
429 | return sizeof($c); |
||
430 | } |
||
431 | |||
432 | /** |
||
433 | * @paramstring $cluster |
||
434 | * @return int |
||
435 | * @throws Exception\TransportException |
||
436 | */ |
||
437 | public function getClusterCountReplica($cluster) |
||
438 | { |
||
439 | $table = $this->getClusterInfoTable($cluster); |
||
440 | $c = []; |
||
441 | foreach ($table as $row) { |
||
442 | $c[$row['replica_num']] = 1; |
||
443 | } |
||
444 | return sizeof($c); |
||
445 | } |
||
446 | |||
447 | /** |
||
448 | * @paramstring $cluster |
||
449 | * @return mixed |
||
450 | * @throws Exception\TransportException |
||
451 | */ |
||
452 | public function getClusterInfoTable($cluster) |
||
453 | { |
||
454 | $this->connect(); |
||
455 | if (empty($this->resultScan['cluster.list'][$cluster])) { |
||
456 | throw new QueryException('Cluster not find:' . $cluster); |
||
457 | } |
||
458 | return $this->resultScan['cluster.list'][$cluster]; |
||
459 | } |
||
460 | |||
461 | /** |
||
462 | * @paramstring $cluster |
||
463 | * @return array |
||
464 | * @throws Exception\TransportException |
||
465 | */ |
||
466 | public function getClusterNodes($cluster) |
||
467 | { |
||
468 | return array_keys($this->getClusterInfoTable($cluster)); |
||
469 | } |
||
470 | |||
471 | /** |
||
472 | * @return array |
||
473 | * @throws Exception\TransportException |
||
474 | */ |
||
475 | public function getClusterList() |
||
476 | { |
||
477 | $this->connect(); |
||
478 | return array_keys($this->resultScan['cluster.list']); |
||
479 | } |
||
480 | |||
481 | /** |
||
482 | * list all tables on all nodes |
||
483 | * |
||
484 | * @return array |
||
485 | * @throws Exception\TransportException |
||
486 | */ |
||
487 | public function getTables($resultDetail = false) |
||
488 | { |
||
489 | $this->connect(); |
||
490 | $list = []; |
||
491 | foreach ($this->tables as $db_name=>$tables) |
||
492 | { |
||
493 | foreach ($tables as $table_name=>$nodes) |
||
494 | { |
||
495 | |||
496 | if ($resultDetail) |
||
497 | { |
||
498 | $list[$db_name . '.' . $table_name] = $nodes; |
||
499 | } else |
||
500 | { |
||
501 | $list[$db_name . '.' . $table_name] = array_keys($nodes); |
||
502 | } |
||
503 | } |
||
504 | } |
||
505 | return $list; |
||
506 | } |
||
507 | |||
508 | /** |
||
509 | * Table size on cluster |
||
510 | * |
||
511 | * @param string $database_table |
||
512 | * @return array|null |
||
513 | * |
||
514 | * @throws Exception\TransportException |
||
515 | */ |
||
516 | public function getSizeTable($database_table) |
||
517 | { |
||
518 | $nodes = $this->getNodesByTable($database_table); |
||
519 | // scan need node`s |
||
520 | foreach ($nodes as $node) |
||
521 | { |
||
522 | if (empty($this->_table_size_cache[$node])) |
||
523 | { |
||
524 | $this->_table_size_cache[$node] = $this->client($node)->tablesSize(true); |
||
525 | } |
||
526 | } |
||
527 | |||
528 | $sizes = []; |
||
529 | foreach ($this->_table_size_cache as $node=>$rows) |
||
530 | { |
||
531 | foreach ($rows as $row) |
||
532 | { |
||
533 | $sizes[$row['database'] . '.' . $row['table']][$node] = $row; |
||
534 | @$sizes[$row['database'] . '.' . $row['table']]['total']['sizebytes'] += $row['sizebytes']; |
||
535 | |||
536 | |||
537 | |||
538 | } |
||
539 | } |
||
540 | |||
541 | if (empty($sizes[$database_table])) |
||
542 | { |
||
543 | return null; |
||
544 | } |
||
545 | return $sizes[$database_table]['total']['sizebytes']; |
||
546 | } |
||
547 | |||
548 | |||
549 | /** |
||
550 | * Truncate on all nodes |
||
551 | * @deprecated |
||
552 | * @param string $database_table |
||
553 | * @return array |
||
554 | * @throws Exception\TransportException |
||
555 | */ |
||
556 | public function truncateTable($database_table, $timeOut = 2000) |
||
557 | { |
||
558 | $out = []; |
||
559 | list($db, $table) = explode('.', $database_table); |
||
560 | $nodes = $this->getMasterNodeForTable($database_table); |
||
561 | // scan need node`s |
||
562 | foreach ($nodes as $node) |
||
563 | { |
||
564 | $def = $this->client($node)->getTimeout(); |
||
565 | $this->client($node)->database($db)->setTimeout($timeOut); |
||
566 | $out[$node] = $this->client($node)->truncateTable($table); |
||
567 | $this->client($node)->setTimeout($def); |
||
568 | } |
||
569 | return $out; |
||
570 | } |
||
571 | |||
572 | /** |
||
573 | * is_leader node |
||
574 | * |
||
575 | * @param string $database_table |
||
576 | * @return array |
||
577 | * @throws Exception\TransportException |
||
578 | */ |
||
579 | public function getMasterNodeForTable($database_table) |
||
580 | { |
||
581 | $list = $this->getTables(true); |
||
582 | |||
583 | if (empty($list[$database_table])) { |
||
584 | return []; |
||
585 | } |
||
586 | |||
587 | |||
588 | $result = []; |
||
589 | foreach ($list[$database_table] as $node=>$row) |
||
590 | { |
||
591 | if ($row['is_leader']) { |
||
592 | $result[] = $node; |
||
593 | } |
||
594 | } |
||
595 | return $result; |
||
596 | } |
||
597 | /** |
||
598 | * Find nodes by : db_name.table_name |
||
599 | * |
||
600 | * @param string $database_table |
||
601 | * @return array |
||
602 | * @throws Exception\TransportException |
||
603 | */ |
||
604 | public function getNodesByTable($database_table) |
||
605 | { |
||
606 | $list = $this->getTables(); |
||
607 | if (empty($list[$database_table])) { |
||
608 | throw new QueryException('Not find :' . $database_table); |
||
609 | } |
||
610 | return $list[$database_table]; |
||
611 | } |
||
612 | |||
613 | /** |
||
614 | * Error string |
||
615 | * |
||
616 | * @return string|bool |
||
617 | */ |
||
618 | public function getError() |
||
619 | { |
||
620 | if (is_array($this->error)) { |
||
621 | return json_encode($this->error); |
||
622 | } |
||
623 | return $this->error; |
||
624 | } |
||
625 | |||
626 | } |
||
627 |
PHP provides two ways to mark string literals. Either with single quotes
'literal'
or with double quotes"literal"
. The difference between these is that string literals in double quotes may contain variables with are evaluated at run-time as well as escape sequences.String literals in single quotes on the other hand are evaluated very literally and the only two characters that needs escaping in the literal are the single quote itself (
\'
) and the backslash (\\
). Every other character is displayed as is.Double quoted string literals may contain other variables or more complex escape sequences.
will print an indented:
Single is Value
If your string literal does not contain variables or escape sequences, it should be defined using single quotes to make that fact clear.
For more information on PHP string literals and available escape sequences see the PHP core documentation.