Routing::picking()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 1
nc 1
nop 1
dl 0
loc 3
rs 10
c 0
b 0
f 0
1
<?php
2
/**
3
 * Cluster nodes routing
4
 * User: moyo
5
 * Date: 27/10/2017
6
 * Time: 4:03 PM
7
 */
8
9
namespace Carno\Cluster;
10
11
use Carno\Cluster\Contracts\Tags;
12
use Carno\Cluster\Routing\Property;
13
use Carno\Cluster\Routing\Selector;
14
use Carno\Net\Endpoint;
15
use Closure;
16
17
class Routing
18
{
19
    use Property;
20
21
    /**
22
     * event types
23
     */
24
    public const JOINING = 0xE0;
25
    public const LEAVING = 0xE1;
26
27
    /**
28
     * @var string
29
     */
30
    private $serviced = 'unassigned';
31
32
    /**
33
     * @var string[]
34
     */
35
    private $tags = [];
36
37
    /**
38
     * @var bool
39
     */
40
    private $strict = true;
41
42
    /**
43
     * @var Closure
44
     */
45
    private $watcher = null;
46
47
    /**
48
     * Routing constructor.
49
     * @param string $serviced
50
     * @param Closure $watcher
51
     */
52
    public function __construct(string $serviced, Closure $watcher)
53
    {
54
        $this->selector = new Selector($this->serviced = $serviced);
55
        $this->watcher = $watcher;
56
    }
57
58
    /**
59
     * @param string ...$tags
60
     * @return static
61
     */
62
    public function accepts(string ...$tags) : self
63
    {
64
        $this->tags = array_unique(array_merge($this->tags, $tags));
65
        return $this;
66
    }
67
68
    /**
69
     * @param bool $yes
70
     * @return static
71
     */
72
    public function strictly(bool $yes) : self
73
    {
74
        $this->strict = $yes;
75
        return $this;
76
    }
77
78
    /**
79
     * @param string ...$tags
80
     * @return Endpoint
81
     */
82
    public function picking(string ...$tags) : Endpoint
83
    {
84
        return $this->selector->picking(...$tags);
85
    }
86
87
    /**
88
     * @param Endpoint $endpoint
89
     */
90
    public function join(Endpoint $endpoint) : void
91
    {
92
        if (!$this->strict || $this->inTags($endpoint->getTags())) {
93
            $this->selector->classify($endpoint);
94
            logger('cluster')->info('Discovered new service', ['node' => (string)$endpoint]);
95
            ($this->watcher)(self::JOINING, $endpoint);
96
        } else {
97
            $this->nmWarning($endpoint, 'route.join');
98
        }
99
    }
100
101
    /**
102
     * @param Endpoint $endpoint
103
     */
104
    public function leave(Endpoint $endpoint) : void
105
    {
106
        if (!$this->strict || $this->inTags($endpoint->getTags())) {
107
            $this->selector->release($endpoint);
108
            logger('cluster')->info('Service is gone', ['node' => (string)$endpoint]);
109
            ($this->watcher)(self::LEAVING, $endpoint);
110
        } else {
111
            $this->nmWarning($endpoint, 'route.leave');
112
        }
113
    }
114
115
    /**
116
     * @param array $epTags
117
     * @return bool
118
     */
119
    private function inTags(array $epTags) : bool
120
    {
121
        foreach ($epTags ?: Tags::DEFAULT as $tag) {
122
            if (in_array($tag, $this->tags)) {
123
                return true;
124
            } elseif (substr($tag, 0, strlen(Tags::CMD)) === Tags::CMD) {
125
                return true;
126
            }
127
        }
128
129
        return false;
130
    }
131
132
    /**
133
     * @param Endpoint $endpoint
134
     * @param string $op
135
     */
136
    private function nmWarning(Endpoint $endpoint, string $op) : void
137
    {
138
        logger('cluster')->info(
139
            'Discovered service tags not match',
140
            [
141
                'action' => $op,
142
                'service' => $endpoint->service(),
143
                'expect' => implode(',', $this->tags ?? []),
144
                'found' => implode(',', $endpoint->getTags()),
145
            ]
146
        );
147
    }
148
}
149