Nsqd   A
last analyzed

Complexity

Total Complexity 10

Size/Duplication

Total Lines 120
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 35
dl 0
loc 120
rs 10
c 0
b 0
f 0
wmc 10

7 Methods

Rating   Name   Duplication   Size   Complexity  
A protocol() 0 18 3
A sending() 0 3 1
A connect() 0 4 1
A __construct() 0 9 1
A waiting() 0 3 1
A close() 0 4 2
A heartbeat() 0 3 1
1
<?php
2
/**
3
 * Nsqd core
4
 * User: moyo
5
 * Date: 17/11/2017
6
 * Time: 6:36 PM
7
 */
8
9
namespace Carno\NSQ\Connector;
10
11
use Carno\Net\Contracts\TCP;
12
use Carno\Net\Endpoint;
13
use Carno\Net\Events;
14
use Carno\NSQ\Chips\ITCommands;
15
use Carno\NSQ\Protocol\Handshake;
16
use Carno\NSQ\Protocol\Receiver;
17
use Carno\NSQ\Protocol\Subscribe;
18
use Carno\NSQ\Types\Consuming;
19
use Carno\Pool\Managed;
20
use Carno\Pool\Poolable;
21
use Carno\Promise\Promise;
22
use Carno\Promise\Promised;
23
use Carno\Socket\Contracts\Stream;
24
use Carno\Socket\Socket;
25
26
class Nsqd implements Poolable
27
{
28
    use Managed;
29
    use ITCommands;
30
31
    /**
32
     * @var TCP
33
     */
34
    private $socket = null;
35
36
    /**
37
     * @var Endpoint
38
     */
39
    private $endpoint = null;
40
41
    /**
42
     * @var Consuming
43
     */
44
    private $consuming = null;
45
46
    /**
47
     * @var Promised
48
     */
49
    private $connected = null;
50
51
    /**
52
     * @var Promised
53
     */
54
    private $disconnected = null;
55
56
    /**
57
     * @var Promised
58
     */
59
    private $waiting = null;
60
61
    /**
62
     * @var Receiver
63
     */
64
    private $receiver = null;
65
66
    /**
67
     * Nsqd constructor.
68
     * @param Endpoint $endpoint
69
     * @param Consuming $consuming
70
     */
71
    public function __construct(Endpoint $endpoint, Consuming $consuming = null)
72
    {
73
        $this->receiver = new Receiver;
74
75
        $this->endpoint = $endpoint;
76
        $this->consuming = $consuming;
77
78
        $this->connected = Promise::deferred();
79
        $this->disconnected = Promise::deferred()->sync($this->closed());
80
    }
81
82
    /**
83
     * @return Promised
84
     */
85
    public function connect() : Promised
86
    {
87
        $this->socket = Socket::connect($this->endpoint->address(), $this->protocol());
88
        return $this->connected;
89
    }
90
91
    /**
92
     * @return Promised
93
     */
94
    public function heartbeat() : Promised
95
    {
96
        return Promise::resolved();
97
    }
98
99
    /**
100
     * @return Promised
101
     */
102
    public function close() : Promised
103
    {
104
        $this->consuming ? $this->cls() : $this->socket->close();
105
        return $this->disconnected;
106
    }
107
108
    /**
109
     * @return Promised
110
     */
111
    protected function waiting() : Promised
112
    {
113
        return $this->waiting = Promise::deferred();
114
    }
115
116
    /**
117
     * @param string $data
118
     * @return bool
119
     */
120
    protected function sending(string $data) : bool
121
    {
122
        return $this->socket->send($data);
123
    }
124
125
    /**
126
     * @return Events
127
     */
128
    private function protocol() : Events
129
    {
130
        return (new Events)
131
            ->attach(Events\Socket::CONNECTED, function (Stream $conn) {
132
                new Handshake($conn, $this, $this->connected);
133
                $this->consuming && new Subscribe($this->connected, $this, $this->consuming);
134
            })
135
            ->attach(Events\Socket::RECEIVED, function (Stream $conn) {
136
                $this->receiver->inbound($conn->recv(), $this, function () {
137
                    return $this->waiting;
138
                }, $this->consuming);
139
            })
140
            ->attach(Events\Socket::CLOSED, function () {
141
                $this->disconnected->resolve();
142
            })
143
            ->attach(Events\Socket::ERROR, function () {
144
                $this->socket && $this->socket->close();
145
                $this->destroy();
146
            })
147
        ;
148
    }
149
}
150