1 | <?php |
||
2 | /** |
||
3 | * Endpoints refresher (from lookupd) |
||
4 | * User: moyo |
||
5 | * Date: 17/11/2017 |
||
6 | * Time: 5:29 PM |
||
7 | */ |
||
8 | |||
9 | namespace Carno\NSQ\Chips; |
||
10 | |||
11 | use Carno\Net\Endpoint; |
||
12 | use Carno\NSQ\Components\Lookupd; |
||
13 | use Carno\NSQ\Connector\Linker; |
||
14 | use Closure; |
||
15 | |||
16 | trait EndpointsRefresher |
||
17 | { |
||
18 | /** |
||
19 | * @var Endpoint[] |
||
20 | */ |
||
21 | private $cachedEndpoints = []; |
||
22 | |||
23 | /** |
||
24 | * @param Lookupd $lookupd |
||
25 | * @param string $topic |
||
26 | * @return Endpoint |
||
27 | */ |
||
28 | protected function lookupEndpoint(Lookupd $lookupd, string $topic) |
||
29 | { |
||
30 | if (empty($this->cachedEndpoints)) { |
||
31 | $this->cachedEndpoints = yield $lookupd->endpoints($topic, function (array $endpoints) { |
||
0 ignored issues
–
show
Bug
Best Practice
introduced
by
![]() |
|||
32 | $this->cachedEndpoints = $endpoints; |
||
33 | }); |
||
34 | } |
||
35 | |||
36 | return $this->cachedEndpoints[array_rand($this->cachedEndpoints)]; |
||
37 | } |
||
38 | |||
39 | /** |
||
40 | * @param string $topic |
||
41 | * @param Closure $linker |
||
42 | * @param Lookupd $lookupd |
||
43 | * @param array $statics |
||
44 | * @return Endpoint[] |
||
45 | */ |
||
46 | protected function syncedEndpoints(string $topic, Closure $linker, Lookupd $lookupd = null, array $statics = []) |
||
47 | { |
||
48 | return $lookupd ? $lookupd->endpoints($topic, function (array $endpoints) use ($linker) { |
||
0 ignored issues
–
show
|
|||
49 | return $this->routedEndpoints($linker, ...$endpoints); |
||
50 | }) : $this->routedEndpoints($linker, ...$statics); |
||
51 | } |
||
52 | |||
53 | /** |
||
54 | * @param Closure $linker |
||
55 | * @param Endpoint ...$endpoints |
||
56 | * @return Endpoint[] |
||
57 | */ |
||
58 | private function routedEndpoints(Closure $linker, Endpoint ...$endpoints) |
||
59 | { |
||
60 | // marking |
||
61 | $current = $this->markedEndpoints(...$this->cachedEndpoints); |
||
62 | $found = $this->markedEndpoints(...$endpoints); |
||
63 | |||
64 | // reset |
||
65 | $this->cachedEndpoints = $endpoints; |
||
66 | |||
67 | // checking |
||
68 | foreach ($current as $id => $endpoint) { |
||
69 | if (isset($found[$id])) { |
||
70 | unset($found[$id]); |
||
71 | unset($current[$id]); |
||
72 | } |
||
73 | } |
||
74 | |||
75 | // new joining |
||
76 | foreach ($found as $endpoint) { |
||
77 | yield $linker(Linker::ACT_JOIN, $endpoint); |
||
0 ignored issues
–
show
|
|||
78 | } |
||
79 | |||
80 | // old leaving |
||
81 | foreach ($current as $endpoint) { |
||
82 | yield $linker(Linker::ACT_LEAVE, $endpoint); |
||
83 | } |
||
84 | |||
85 | return $endpoints; |
||
86 | } |
||
87 | |||
88 | /** |
||
89 | * @param Endpoint ...$endpoints |
||
90 | * @return Endpoint[] |
||
91 | */ |
||
92 | private function markedEndpoints(Endpoint ...$endpoints) |
||
93 | { |
||
94 | $marked = []; |
||
95 | |||
96 | foreach ($endpoints as $endpoint) { |
||
97 | $marked[(string)$endpoint->address()] = $endpoint; |
||
98 | } |
||
99 | |||
100 | return $marked; |
||
101 | } |
||
102 | } |
||
103 |