carno-php /
nsq
| 1 | <?php |
||
| 2 | /** |
||
| 3 | * Endpoints refresher (from lookupd) |
||
| 4 | * User: moyo |
||
| 5 | * Date: 17/11/2017 |
||
| 6 | * Time: 5:29 PM |
||
| 7 | */ |
||
| 8 | |||
| 9 | namespace Carno\NSQ\Chips; |
||
| 10 | |||
| 11 | use Carno\Net\Endpoint; |
||
| 12 | use Carno\NSQ\Components\Lookupd; |
||
| 13 | use Carno\NSQ\Connector\Linker; |
||
| 14 | use Closure; |
||
| 15 | |||
| 16 | trait EndpointsRefresher |
||
| 17 | { |
||
| 18 | /** |
||
| 19 | * @var Endpoint[] |
||
| 20 | */ |
||
| 21 | private $cachedEndpoints = []; |
||
| 22 | |||
| 23 | /** |
||
| 24 | * @param Lookupd $lookupd |
||
| 25 | * @param string $topic |
||
| 26 | * @return Endpoint |
||
| 27 | */ |
||
| 28 | protected function lookupEndpoint(Lookupd $lookupd, string $topic) |
||
| 29 | { |
||
| 30 | if (empty($this->cachedEndpoints)) { |
||
| 31 | $this->cachedEndpoints = yield $lookupd->endpoints($topic, function (array $endpoints) { |
||
|
0 ignored issues
–
show
Bug
Best Practice
introduced
by
Loading history...
|
|||
| 32 | $this->cachedEndpoints = $endpoints; |
||
| 33 | }); |
||
| 34 | } |
||
| 35 | |||
| 36 | return $this->cachedEndpoints[array_rand($this->cachedEndpoints)]; |
||
| 37 | } |
||
| 38 | |||
| 39 | /** |
||
| 40 | * @param string $topic |
||
| 41 | * @param Closure $linker |
||
| 42 | * @param Lookupd $lookupd |
||
| 43 | * @param array $statics |
||
| 44 | * @return Endpoint[] |
||
| 45 | */ |
||
| 46 | protected function syncedEndpoints(string $topic, Closure $linker, Lookupd $lookupd = null, array $statics = []) |
||
| 47 | { |
||
| 48 | return $lookupd ? $lookupd->endpoints($topic, function (array $endpoints) use ($linker) { |
||
|
0 ignored issues
–
show
|
|||
| 49 | return $this->routedEndpoints($linker, ...$endpoints); |
||
| 50 | }) : $this->routedEndpoints($linker, ...$statics); |
||
| 51 | } |
||
| 52 | |||
| 53 | /** |
||
| 54 | * @param Closure $linker |
||
| 55 | * @param Endpoint ...$endpoints |
||
| 56 | * @return Endpoint[] |
||
| 57 | */ |
||
| 58 | private function routedEndpoints(Closure $linker, Endpoint ...$endpoints) |
||
| 59 | { |
||
| 60 | // marking |
||
| 61 | $current = $this->markedEndpoints(...$this->cachedEndpoints); |
||
| 62 | $found = $this->markedEndpoints(...$endpoints); |
||
| 63 | |||
| 64 | // reset |
||
| 65 | $this->cachedEndpoints = $endpoints; |
||
| 66 | |||
| 67 | // checking |
||
| 68 | foreach ($current as $id => $endpoint) { |
||
| 69 | if (isset($found[$id])) { |
||
| 70 | unset($found[$id]); |
||
| 71 | unset($current[$id]); |
||
| 72 | } |
||
| 73 | } |
||
| 74 | |||
| 75 | // new joining |
||
| 76 | foreach ($found as $endpoint) { |
||
| 77 | yield $linker(Linker::ACT_JOIN, $endpoint); |
||
|
0 ignored issues
–
show
|
|||
| 78 | } |
||
| 79 | |||
| 80 | // old leaving |
||
| 81 | foreach ($current as $endpoint) { |
||
| 82 | yield $linker(Linker::ACT_LEAVE, $endpoint); |
||
| 83 | } |
||
| 84 | |||
| 85 | return $endpoints; |
||
| 86 | } |
||
| 87 | |||
| 88 | /** |
||
| 89 | * @param Endpoint ...$endpoints |
||
| 90 | * @return Endpoint[] |
||
| 91 | */ |
||
| 92 | private function markedEndpoints(Endpoint ...$endpoints) |
||
| 93 | { |
||
| 94 | $marked = []; |
||
| 95 | |||
| 96 | foreach ($endpoints as $endpoint) { |
||
| 97 | $marked[(string)$endpoint->address()] = $endpoint; |
||
| 98 | } |
||
| 99 | |||
| 100 | return $marked; |
||
| 101 | } |
||
| 102 | } |
||
| 103 |