ZmqSocket::close()   A
last analyzed

Complexity

Conditions 2
Paths 2

Size

Total Lines 14
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 6

Importance

Changes 0
Metric Value
dl 0
loc 14
ccs 0
cts 13
cp 0
rs 9.4285
c 0
b 0
f 0
cc 2
eloc 9
nc 2
nop 0
crap 6
1
<?php
2
3
namespace Dazzle\Zmq;
4
5
use Dazzle\Event\BaseEventEmitter;
6
use Dazzle\Loop\LoopInterface;
7
use ZMQ;
8
use ZMQSocket as RawZMQSocket;
9
10
class ZmqSocket extends BaseEventEmitter
11
{
12
    /**
13
     * @var resource
14
     */
15
    public $fd;
16
17
    /**
18
     * @var bool
19
     */
20
    public $closed = false;
21
22
    /**
23
     * @var RawZMQSocket
24
     */
25
    private $socket;
26
27
    /**
28
     * @var LoopInterface
29
     */
30
    private $loop;
31
32
    /**
33
     * @var ZmqBuffer
34
     */
35
    private $buffer;
36
37
    /**
38
     * @param RawZMQSocket $socket
39
     * @param LoopInterface $loop
40
     */
41
    public function __construct(RawZMQSocket $socket, LoopInterface $loop)
42
    {
43
        $this->socket = $socket;
44
        $this->loop = $loop;
45
46
        $this->fd = $this->socket->getSockOpt(ZMQ::SOCKOPT_FD);
47
48
        $writeListener = [ $this, 'handleEvent' ];
49
        $this->buffer = new ZmqBuffer($socket, $this->fd, $this->loop, $writeListener);
50
    }
51
52
    /**
53
     * Attach read listener.
54
     */
55
    public function attachReadListener()
56
    {
57
        $this->loop->addReadStream($this->fd, [ $this, 'handleEvent' ]);
58
    }
59
60
    /**
61
     * Handle ZMQ Event.
62
     */
63
    public function handleEvent()
64
    {
65
        while ($this->socket !== null)
66
        {
67
            $events = $this->socket->getSockOpt(ZMQ::SOCKOPT_EVENTS);
68
69
            $hasEvents = ($events & ZMQ::POLL_IN) || ($events & ZMQ::POLL_OUT && $this->buffer->listening);
70
            if (!$hasEvents)
71
            {
72
                break;
73
            }
74
75
            if ($events & ZMQ::POLL_IN)
76
            {
77
                $this->handleReadEvent();
78
            }
79
80
            if ($events & ZMQ::POLL_OUT && $this->buffer->listening)
81
            {
82
                $this->buffer->handleWriteEvent();
83
            }
84
        }
85
    }
86
87
    /**
88
     * Handle ZMQ Read Event.
89
     */
90
    public function handleReadEvent()
91
    {
92
        $messages = $this->socket->recvmulti(ZMQ::MODE_DONTWAIT);
93
        if (false !== $messages)
94
        {
95
            $this->emit('messages', [ $messages ]);
96
        }
97
    }
98
99
    /**
100
     * Return socket.
101
     *
102
     * @return RawZMQSocket
103
     */
104
    public function getWrappedSocket()
105
    {
106
        return $this->socket;
107
    }
108
109
    /**
110
     * Subscribe socket to channel.
111
     *
112
     * @param mixed $channel
113
     */
114
    public function subscribe($channel)
115
    {
116
        $this->socket->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE, $channel);
117
    }
118
119
    /**
120
     * Unsubscribe socket from channel.
121
     *
122
     * @param mixed $channel
123
     */
124
    public function unsubscribe($channel)
125
    {
126
        $this->socket->setSockOpt(ZMQ::SOCKOPT_UNSUBSCRIBE, $channel);
127
    }
128
129
    /**
130
     * Send message.
131
     *
132
     * @param string $message
133
     */
134
    public function send($message)
135
    {
136
        $this->buffer->send($message);
137
    }
138
139
    /**
140
     * Close connection and discard not sent data.
141
     */
142
    public function close()
143
    {
144
        if ($this->closed)
145
        {
146
            return;
147
        }
148
149
        $this->emit('end', [ $this ]);
150
        $this->loop->removeStream($this->fd);
151
        $this->buffer->flushListeners();
152
        $this->flushListeners();
153
        unset($this->socket);
154
        $this->closed = true;
155
    }
156
157
    /**
158
     * Deactivate socket, wait to complete sending all unfinished data, then close connection.
159
     */
160
    public function end()
161
    {
162
        if ($this->closed)
163
        {
164
            return;
165
        }
166
167
        $that = $this;
168
        $this->buffer->on('end', function() use($that) {
169
            $that->close();
170
        });
171
172
        $this->buffer->end();
173
    }
174
175
    /**
176
     * @param string $method
177
     * @param mixed[] $args
178
     * @return mixed
179
     */
180
    public function __call($method, $args)
181
    {
182
        return call_user_func_array([ $this->socket, $method ], $args);
183
    }
184
}
185