Issues (14)

src/Components/Lookupd.php (2 issues)

1
<?php
2
/**
3
 * Lookupd
4
 * User: moyo
5
 * Date: 15/11/2017
6
 * Time: 4:02 PM
7
 */
8
9
namespace Carno\NSQ\Components;
10
11
use Carno\HTTP\Client;
12
use Carno\HTTP\Options;
13
use Carno\HTTP\Standard\Request;
14
use Carno\HTTP\Standard\Response;
15
use Carno\HTTP\Standard\Uri;
16
use Carno\Net\Address;
17
use Carno\Net\Endpoint;
18
use Carno\NSQ\Chips\LookupCached;
19
use Carno\NSQ\Exception\LookupRequestException;
20
use Carno\NSQ\Exception\NoneEndpointsException;
21
use Closure;
22
23
class Lookupd
24
{
25
    use LookupCached;
26
27
    /**
28
     * @var string
29
     */
30
    private $host = null;
31
32
    /**
33
     * @var int
34
     */
35
    private $port = null;
36
37
    /**
38
     * @var Client
39
     */
40
    private $http = null;
41
42
    /**
43
     * @var array
44
     */
45
    private $observers = [];
46
47
    /**
48
     * Lookupd constructor.
49
     * @param string $host
50
     * @param int $port
51
     */
52
    public function __construct(string $host = 'localhost', int $port = 4161)
53
    {
54
        $this->host = $host;
55
        $this->port = $port;
56
57
        $this->http = new Client(new Options, new Address($host, $port));
58
    }
59
60
    /**
61
     * @param string $topic
62
     * @param Closure $observer
63
     * @return Endpoint[]
64
     * @throws NoneEndpointsException
65
     */
66
    public function endpoints(string $topic, Closure $observer = null)
67
    {
68
        $this->observers[] = $observer;
69
70
        $querying = function () use ($topic) {
71
            /**
72
             * @var Response $response
73
             */
74
            $uri = new Uri('http', $this->host, $this->port, '/lookup', ['topic' => $topic]);
75
            $request = new Request('GET', $uri, ['Accept' => 'application/vnd.nsq; version=1.0']);
76
            $response = yield $this->http->perform($request);
77
            switch ($response->getStatusCode()) {
78
                case 200:
79
                    $endpoints = $this->parsing((string)$response->getBody());
80
                    if ($this->observers) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $this->observers of type array is implicitly converted to a boolean; are you sure this is intended? If so, consider using ! empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
81
                        foreach ($this->observers as $observer) {
82
                            yield $observer($endpoints);
83
                        }
84
                    }
85
                    return $endpoints;
86
                case 404:
87
                    throw new NoneEndpointsException;
88
                default:
89
                    throw new LookupRequestException;
90
            }
91
        };
92
93
        if ($this->cached) {
94
            return yield $this->cached->delegate($topic, $querying, $this->ttl);
0 ignored issues
show
Bug Best Practice introduced by
The expression yield $this->cached->del... $querying, $this->ttl) returns the type Generator which is incompatible with the documented return type Carno\Net\Endpoint[].
Loading history...
95
        } else {
96
            return yield $querying();
97
        }
98
    }
99
100
    /**
101
     * @param string $body
102
     * @return Endpoint[]
103
     */
104
    private function parsing(string $body) : array
105
    {
106
        $response = json_decode($body, true);
107
        if (isset($response['producers']) && $response['producers']) {
108
            $eps = [];
109
            foreach ($response['producers'] as $producer) {
110
                $eps[] =
111
                    (new Endpoint(new Address($producer['broadcast_address'], $producer['tcp_port'])))
112
                        ->relatedService($producer['hostname'])
113
                ;
114
            }
115
            return $eps;
116
        }
117
        throw new NoneEndpointsException;
118
    }
119
}
120