1 | <?php |
||
2 | /** |
||
3 | * Lookupd |
||
4 | * User: moyo |
||
5 | * Date: 15/11/2017 |
||
6 | * Time: 4:02 PM |
||
7 | */ |
||
8 | |||
9 | namespace Carno\NSQ\Components; |
||
10 | |||
11 | use Carno\HTTP\Client; |
||
12 | use Carno\HTTP\Options; |
||
13 | use Carno\HTTP\Standard\Request; |
||
14 | use Carno\HTTP\Standard\Response; |
||
15 | use Carno\HTTP\Standard\Uri; |
||
16 | use Carno\Net\Address; |
||
17 | use Carno\Net\Endpoint; |
||
18 | use Carno\NSQ\Chips\LookupCached; |
||
19 | use Carno\NSQ\Exception\LookupRequestException; |
||
20 | use Carno\NSQ\Exception\NoneEndpointsException; |
||
21 | use Closure; |
||
22 | |||
23 | class Lookupd |
||
24 | { |
||
25 | use LookupCached; |
||
26 | |||
27 | /** |
||
28 | * @var string |
||
29 | */ |
||
30 | private $host = null; |
||
31 | |||
32 | /** |
||
33 | * @var int |
||
34 | */ |
||
35 | private $port = null; |
||
36 | |||
37 | /** |
||
38 | * @var Client |
||
39 | */ |
||
40 | private $http = null; |
||
41 | |||
42 | /** |
||
43 | * @var array |
||
44 | */ |
||
45 | private $observers = []; |
||
46 | |||
47 | /** |
||
48 | * Lookupd constructor. |
||
49 | * @param string $host |
||
50 | * @param int $port |
||
51 | */ |
||
52 | public function __construct(string $host = 'localhost', int $port = 4161) |
||
53 | { |
||
54 | $this->host = $host; |
||
55 | $this->port = $port; |
||
56 | |||
57 | $this->http = new Client(new Options, new Address($host, $port)); |
||
58 | } |
||
59 | |||
60 | /** |
||
61 | * @param string $topic |
||
62 | * @param Closure $observer |
||
63 | * @return Endpoint[] |
||
64 | * @throws NoneEndpointsException |
||
65 | */ |
||
66 | public function endpoints(string $topic, Closure $observer = null) |
||
67 | { |
||
68 | $this->observers[] = $observer; |
||
69 | |||
70 | $querying = function () use ($topic) { |
||
71 | /** |
||
72 | * @var Response $response |
||
73 | */ |
||
74 | $uri = new Uri('http', $this->host, $this->port, '/lookup', ['topic' => $topic]); |
||
75 | $request = new Request('GET', $uri, ['Accept' => 'application/vnd.nsq; version=1.0']); |
||
76 | $response = yield $this->http->perform($request); |
||
77 | switch ($response->getStatusCode()) { |
||
78 | case 200: |
||
79 | $endpoints = $this->parsing((string)$response->getBody()); |
||
80 | if ($this->observers) { |
||
0 ignored issues
–
show
|
|||
81 | foreach ($this->observers as $observer) { |
||
82 | yield $observer($endpoints); |
||
83 | } |
||
84 | } |
||
85 | return $endpoints; |
||
86 | case 404: |
||
87 | throw new NoneEndpointsException; |
||
88 | default: |
||
89 | throw new LookupRequestException; |
||
90 | } |
||
91 | }; |
||
92 | |||
93 | if ($this->cached) { |
||
94 | return yield $this->cached->delegate($topic, $querying, $this->ttl); |
||
0 ignored issues
–
show
|
|||
95 | } else { |
||
96 | return yield $querying(); |
||
97 | } |
||
98 | } |
||
99 | |||
100 | /** |
||
101 | * @param string $body |
||
102 | * @return Endpoint[] |
||
103 | */ |
||
104 | private function parsing(string $body) : array |
||
105 | { |
||
106 | $response = json_decode($body, true); |
||
107 | if (isset($response['producers']) && $response['producers']) { |
||
108 | $eps = []; |
||
109 | foreach ($response['producers'] as $producer) { |
||
110 | $eps[] = |
||
111 | (new Endpoint(new Address($producer['broadcast_address'], $producer['tcp_port']))) |
||
112 | ->relatedService($producer['hostname']) |
||
113 | ; |
||
114 | } |
||
115 | return $eps; |
||
116 | } |
||
117 | throw new NoneEndpointsException; |
||
118 | } |
||
119 | } |
||
120 |
This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.
Consider making the comparison explicit by using
empty(..)
or! empty(...)
instead.