Issues (14)

src/Chips/EndpointsRefresher.php (3 issues)

1
<?php
2
/**
3
 * Endpoints refresher (from lookupd)
4
 * User: moyo
5
 * Date: 17/11/2017
6
 * Time: 5:29 PM
7
 */
8
9
namespace Carno\NSQ\Chips;
10
11
use Carno\Net\Endpoint;
12
use Carno\NSQ\Components\Lookupd;
13
use Carno\NSQ\Connector\Linker;
14
use Closure;
15
16
trait EndpointsRefresher
17
{
18
    /**
19
     * @var Endpoint[]
20
     */
21
    private $cachedEndpoints = [];
22
23
    /**
24
     * @param Lookupd $lookupd
25
     * @param string $topic
26
     * @return Endpoint
27
     */
28
    protected function lookupEndpoint(Lookupd $lookupd, string $topic)
29
    {
30
        if (empty($this->cachedEndpoints)) {
31
            $this->cachedEndpoints = yield $lookupd->endpoints($topic, function (array $endpoints) {
0 ignored issues
show
Bug Best Practice introduced by
The expression yield $lookupd->endpoint...ion(...) { /* ... */ }) returns the type Generator which is incompatible with the documented return type Carno\Net\Endpoint.
Loading history...
32
                $this->cachedEndpoints = $endpoints;
33
            });
34
        }
35
36
        return $this->cachedEndpoints[array_rand($this->cachedEndpoints)];
37
    }
38
39
    /**
40
     * @param string $topic
41
     * @param Closure $linker
42
     * @param Lookupd $lookupd
43
     * @param array $statics
44
     * @return Endpoint[]
45
     */
46
    protected function syncedEndpoints(string $topic, Closure $linker, Lookupd $lookupd = null, array $statics = [])
47
    {
48
        return $lookupd ? $lookupd->endpoints($topic, function (array $endpoints) use ($linker) {
0 ignored issues
show
Bug Best Practice introduced by
The expression return $lookupd ? $looku...ints($linker, $statics) also could return the type Generator which is incompatible with the documented return type Carno\Net\Endpoint[].
Loading history...
49
            return $this->routedEndpoints($linker, ...$endpoints);
50
        }) : $this->routedEndpoints($linker, ...$statics);
51
    }
52
53
    /**
54
     * @param Closure $linker
55
     * @param Endpoint ...$endpoints
56
     * @return Endpoint[]
57
     */
58
    private function routedEndpoints(Closure $linker, Endpoint ...$endpoints)
59
    {
60
        // marking
61
        $current = $this->markedEndpoints(...$this->cachedEndpoints);
62
        $found = $this->markedEndpoints(...$endpoints);
63
64
        // reset
65
        $this->cachedEndpoints = $endpoints;
66
67
        // checking
68
        foreach ($current as $id => $endpoint) {
69
            if (isset($found[$id])) {
70
                unset($found[$id]);
71
                unset($current[$id]);
72
            }
73
        }
74
75
        // new joining
76
        foreach ($found as $endpoint) {
77
            yield $linker(Linker::ACT_JOIN, $endpoint);
0 ignored issues
show
Bug Best Practice introduced by
The expression yield $linker(Carno\NSQ\...r::ACT_JOIN, $endpoint) returns the type Generator which is incompatible with the documented return type Carno\Net\Endpoint[].
Loading history...
78
        }
79
80
        // old leaving
81
        foreach ($current as $endpoint) {
82
            yield $linker(Linker::ACT_LEAVE, $endpoint);
83
        }
84
85
        return $endpoints;
86
    }
87
88
    /**
89
     * @param Endpoint ...$endpoints
90
     * @return Endpoint[]
91
     */
92
    private function markedEndpoints(Endpoint ...$endpoints)
93
    {
94
        $marked = [];
95
96
        foreach ($endpoints as $endpoint) {
97
            $marked[(string)$endpoint->address()] = $endpoint;
98
        }
99
100
        return $marked;
101
    }
102
}
103