1 | <?php |
||||
2 | /** |
||||
3 | * Nodes routing |
||||
4 | * User: moyo |
||||
5 | * Date: 27/10/2017 |
||||
6 | * Time: 4:04 PM |
||||
7 | */ |
||||
8 | |||||
9 | namespace Carno\Cluster\Managed; |
||||
10 | |||||
11 | use Carno\Cluster\Contracts\Tags; |
||||
12 | use Carno\Cluster\Routing; |
||||
13 | use function Carno\Coroutine\all; |
||||
14 | use Carno\Net\Address; |
||||
15 | use Carno\Net\Endpoint; |
||||
16 | use Carno\Promise\Promise; |
||||
17 | use Carno\Promise\Promised; |
||||
18 | |||||
19 | trait Router |
||||
20 | { |
||||
21 | /** |
||||
22 | * @var array |
||||
23 | */ |
||||
24 | protected $tags = Tags::DEFAULT; |
||||
25 | |||||
26 | /** |
||||
27 | * @var bool |
||||
28 | */ |
||||
29 | protected $strict = true; |
||||
30 | |||||
31 | /** |
||||
32 | * @var Routing |
||||
33 | */ |
||||
34 | private $routing = null; |
||||
35 | |||||
36 | /** |
||||
37 | * @var mixed[] |
||||
38 | */ |
||||
39 | private $connected = []; |
||||
40 | |||||
41 | /** |
||||
42 | * @return Routing |
||||
43 | */ |
||||
44 | public function routing() : Routing |
||||
45 | { |
||||
46 | return |
||||
47 | $this->routing ?? |
||||
48 | $this->routing = (new Routing( |
||||
49 | sprintf('%s:%s', $this->type, $this->server), |
||||
50 | function (int $evk, Endpoint $endpoint) { |
||||
51 | $this->changed($evk, $endpoint); |
||||
52 | } |
||||
53 | ))->strictly($this->strict)->accepts(...$this->tags) |
||||
54 | ; |
||||
55 | } |
||||
56 | |||||
57 | /** |
||||
58 | * @param int $evk |
||||
59 | * @param Endpoint $endpoint |
||||
60 | */ |
||||
61 | protected function changed(int $evk, Endpoint $endpoint) : void |
||||
62 | { |
||||
63 | switch ($evk) { |
||||
64 | case Routing::JOINING: |
||||
65 | $this->joined(true); |
||||
0 ignored issues
–
show
Bug
introduced
by
![]() |
|||||
66 | $this->discovered($endpoint); |
||||
67 | break; |
||||
68 | case Routing::LEAVING: |
||||
69 | $this->disconnected($endpoint); |
||||
70 | break; |
||||
71 | } |
||||
72 | } |
||||
73 | |||||
74 | /** |
||||
75 | * @param Endpoint $node |
||||
76 | */ |
||||
77 | protected function discovered(Endpoint $node) : void |
||||
0 ignored issues
–
show
The parameter
$node is not used and could be removed.
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
This check looks for parameters that have been defined for a function or method, but which are not used in the method body. ![]() |
|||||
78 | { |
||||
79 | // do nothing |
||||
80 | } |
||||
81 | |||||
82 | /** |
||||
83 | * @return bool |
||||
84 | */ |
||||
85 | protected function clustered() : bool |
||||
86 | { |
||||
87 | return $this->routing()->clustered(); |
||||
88 | } |
||||
89 | |||||
90 | /** |
||||
91 | * @param Endpoint $node |
||||
92 | * @return mixed |
||||
93 | */ |
||||
94 | protected function picked(Endpoint $node) |
||||
95 | { |
||||
96 | return $this->connected($node); |
||||
97 | } |
||||
98 | |||||
99 | /** |
||||
100 | * @param string ...$tags |
||||
101 | * @return mixed |
||||
102 | */ |
||||
103 | protected function picking(string ...$tags) |
||||
104 | { |
||||
105 | return $this->connected($this->routing()->picking(...$tags)); |
||||
106 | } |
||||
107 | |||||
108 | /** |
||||
109 | * @return Promised |
||||
110 | */ |
||||
111 | protected function releasing() : Promised |
||||
112 | { |
||||
113 | $pending = []; |
||||
114 | |||||
115 | foreach ($this->connected as $nodeID => $sar) { |
||||
116 | $pending[] = $this->disconnected((new Endpoint(new Address(0)))->assignID($nodeID)); |
||||
117 | } |
||||
118 | |||||
119 | return all(...$pending); |
||||
120 | } |
||||
121 | |||||
122 | /** |
||||
123 | * @param Endpoint $node |
||||
124 | * @return mixed |
||||
125 | */ |
||||
126 | private function connected(Endpoint $node) |
||||
127 | { |
||||
128 | return $this->connected[$node->id()] ?? $this->connected[$node->id()] = $this->connecting($node); |
||||
0 ignored issues
–
show
The method
connecting() does not exist on Carno\Cluster\Managed\Router . Did you maybe mean connected() ?
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces. This is most likely a typographical error or the method has been renamed. ![]() |
|||||
129 | } |
||||
130 | |||||
131 | /** |
||||
132 | * @param Endpoint $node |
||||
133 | * @return Promised |
||||
134 | */ |
||||
135 | private function disconnected(Endpoint $node) : Promised |
||||
136 | { |
||||
137 | if ($cn = $this->connected[$node->id()] ?? null) { |
||||
138 | unset($this->connected[$node->id()]); |
||||
139 | return $this->disconnecting($cn); |
||||
0 ignored issues
–
show
The method
disconnecting() does not exist on Carno\Cluster\Managed\Router . Did you maybe mean disconnected() ?
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces. This is most likely a typographical error or the method has been renamed. ![]() |
|||||
140 | } else { |
||||
141 | return Promise::rejected(); |
||||
142 | } |
||||
143 | } |
||||
144 | } |
||||
145 |