Passed
Push — master ( 22ad49...a7780f )
by Sergey
03:54
created

Connection::onConnectionStart()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 20
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 14
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 20
ccs 14
cts 14
cp 1
rs 9.4285
c 0
b 0
f 0
cc 2
eloc 13
nc 2
nop 1
crap 2
1
<?php
2
3
namespace ButterAMQP\AMQP091;
4
5
use ButterAMQP\ConnectionInterface;
6
use ButterAMQP\Exception\AMQPException;
7
use ButterAMQP\Exception\InvalidChannelNumberException;
8
use ButterAMQP\AMQP091\Framing\Frame;
9
use ButterAMQP\AMQP091\Framing\Heartbeat;
10
use ButterAMQP\AMQP091\Framing\Method\ConnectionBlocked;
11
use ButterAMQP\AMQP091\Framing\Method\ConnectionClose;
12
use ButterAMQP\AMQP091\Framing\Method\ConnectionCloseOk;
13
use ButterAMQP\AMQP091\Framing\Method\ConnectionOpen;
14
use ButterAMQP\AMQP091\Framing\Method\ConnectionOpenOk;
15
use ButterAMQP\AMQP091\Framing\Method\ConnectionStart;
16
use ButterAMQP\AMQP091\Framing\Method\ConnectionStartOk;
17
use ButterAMQP\AMQP091\Framing\Method\ConnectionTune;
18
use ButterAMQP\AMQP091\Framing\Method\ConnectionTuneOk;
19
use ButterAMQP\AMQP091\Framing\Method\ConnectionUnblocked;
20
use ButterAMQP\Heartbeat\TimeHeartbeat;
21
use ButterAMQP\Security\Authenticator;
22
use ButterAMQP\Security\AuthenticatorInterface;
23
use ButterAMQP\Url;
24
25
class Connection implements ConnectionInterface, WireSubscriberInterface
26
{
27
    const STATUS_CLOSED = 'closed';
28
    const STATUS_READY = 'ready';
29
    const STATUS_BLOCKED = 'blocked';
30
31
    /**
32
     * @var Url
33
     */
34
    private $url;
35
36
    /**
37
     * @var WireInterface
38
     */
39
    private $wire;
40
41
    /**
42
     * @var AuthenticatorInterface
43
     */
44
    private $authenticator;
45
46
    /**
47
     * @var string
48
     */
49
    private $status;
50
51
    /**
52
     * @var Channel[]
53
     */
54
    private $channels = [];
55
56
    /**
57
     * @var array
58
     */
59
    private $capabilities = [];
60
61
    /**
62
     * @param Url|string             $url
63
     * @param WireInterface          $wire
64
     * @param AuthenticatorInterface $authenticator
65
     */
66 31
    public function __construct(Url $url, WireInterface $wire, AuthenticatorInterface $authenticator = null)
67
    {
68 31
        $this->url = $url;
69 31
        $this->wire = $wire;
70 31
        $this->authenticator = $authenticator ?: Authenticator::build();
71 31
    }
72
73
    /**
74
     * Connection status. See STATUS_* constants for possible values.
75
     *
76
     * @return string
77
     */
78 4
    public function getStatus()
79
    {
80 4
        return $this->status;
81
    }
82
83
    /**
84
     * {@inheritdoc}
85
     */
86 19
    public function open()
87
    {
88 19
        $this->channels = [];
89 19
        $this->capabilities = [];
90
91 19
        $this->wire->open($this->url)
92 19
            ->subscribe(0, $this);
93
94 19
        $this->wait(ConnectionTune::class);
95
96 18
        $this->send(new ConnectionOpen(0, $this->url->getVhost(), '', false))
97 18
            ->wait(ConnectionOpenOk::class);
98
99 18
        $this->status = self::STATUS_READY;
100
101 18
        return $this;
102
    }
103
104
    /**
105
     * {@inheritdoc}
106
     */
107 19
    public function channel($id = null)
108
    {
109 19
        if ($id === null) {
110 18
            $id = $this->allocateChannelNumber();
111 18
        }
112
113 19
        if (!$this->isChannelNumberValid($id)) {
114 1
            throw new InvalidChannelNumberException('Channel ID should be positive integer');
115
        }
116
117 18
        if (!isset($this->channels[$id])) {
118 18
            $this->channels[$id] = $this->openChannel($id);
119 18
        }
120
121 18
        return $this->channels[$id];
122
    }
123
124
    /**
125
     * @return int
126
     */
127 18
    private function allocateChannelNumber()
128
    {
129 18
        return count($this->channels) == 0 ? 1 : max(array_keys($this->channels)) + 1;
130
    }
131
132
    /**
133
     * @param int $id
134
     *
135
     * @return bool
136
     */
137 19
    private function isChannelNumberValid($id)
138
    {
139 19
        return is_integer($id) && $id > 0;
140
    }
141
142
    /**
143
     * @param int $id
144
     *
145
     * @return Channel
146
     */
147 18
    private function openChannel($id)
148
    {
149 18
        $channel = new Channel($this->wire, $id);
150 18
        $channel->open();
151
152 18
        return $channel;
153
    }
154
155
    /**
156
     * {@inheritdoc}
157
     */
158 7
    public function close($code = 0, $reason = '')
159
    {
160 7
        $this->send(new ConnectionClose(0, $code, $reason, 0, 0))
161 7
            ->wait(ConnectionCloseOk::class);
162
163 7
        $this->status = self::STATUS_CLOSED;
164
165 7
        $this->wire->close();
166
167 7
        return $this;
168
    }
169
170
    /**
171
     * {@inheritdoc}
172
     */
173 1
    public function isSupported($capability)
174
    {
175 1
        return isset($this->capabilities[$capability]) ?
176 1
            (bool) $this->capabilities[$capability] : false;
177
    }
178
179
    /**
180
     * {@inheritdoc}
181
     */
182 10
    public function serve($blocking = true)
183
    {
184 10
        $this->wire->next($blocking);
185
186 8
        return $this;
187
    }
188
189
    /**
190
     * Sends frame to the service channel (#0).
191
     *
192
     * @param Frame $frame
193
     *
194
     * @return $this
195
     */
196 26
    private function send(Frame $frame)
197
    {
198 26
        $this->wire->send($frame);
199
200 26
        return $this;
201
    }
202
203
    /**
204
     * Wait for a frame in the service channel (#0).
205
     *
206
     * @param string|array $type
207
     *
208
     * @return Frame
209
     */
210 20
    private function wait($type)
211
    {
212 20
        return $this->wire->wait(0, $type);
213
    }
214
215
    /**
216
     * {@inheritdoc}
217
     */
218 26
    public function dispatch(Frame $frame)
219
    {
220 26
        if ($frame instanceof ConnectionStart) {
221 20
            $this->onConnectionStart($frame);
222 26
        } elseif ($frame instanceof ConnectionTune) {
223 19
            $this->onConnectionTune($frame);
224 24
        } elseif ($frame instanceof ConnectionClose) {
225 3
            $this->onConnectionClose($frame);
226 20
        } elseif ($frame instanceof ConnectionBlocked) {
227 1
            $this->onConnectionBlocked($frame);
228 19
        } elseif ($frame instanceof ConnectionUnblocked) {
229 1
            $this->onConnectionUnblocked($frame);
230 1
        }
231 25
    }
232
233
    /**
234
     * This frame is the first frame received from server.
235
     * It provides server details and requests client credentials.
236
     *
237
     * @param ConnectionStart $frame
238
     */
239 20
    private function onConnectionStart(ConnectionStart $frame)
240
    {
241 20
        $properties = $frame->getServerProperties();
242
243 20
        $this->capabilities = isset($properties['capabilities']) ?
244 20
            $properties['capabilities'] : [];
245
246 20
        $mechanism = $this->authenticator
247 20
            ->get(explode(' ', $frame->getMechanisms()));
248
249 20
        list($locale) = explode(' ', $frame->getLocales());
250
251 20
        $this->send(new ConnectionStartOk(
252 20
            0,
253 20
            $this->getClientProperties(),
254 20
            $mechanism->getName(),
255 20
            $mechanism->getResponse($this->url->getUser(), $this->url->getPass()),
256
            $locale
257 20
        ));
258 20
    }
259
260
    /**
261
     * @return array
262
     */
263 20
    private function getClientProperties()
264
    {
265
        return [
266 20
            'platform' => sprintf('PHP %d.%d', PHP_MAJOR_VERSION, PHP_MINOR_VERSION),
267 20
            'product' => 'ButterAMQP',
268 20
            'version' => '0.1.0',
269
            'capabilities' => [
270 20
                'publisher_confirms' => true,
271 20
                'exchange_exchange_bindings' => true,
272 20
                'basic.nack' => true,
273 20
                'connection.blocked' => true,
274 20
                'consumer_cancel_notify' => true,
275 20
                'authentication_failure_close' => true,
276 20
            ],
277 20
        ];
278
    }
279
280
    /**
281
     * This frame is received to setup connection preferences, like max frame size,
282
     * max number of channel and heartbeat delay.
283
     *
284
     * Values in the request can be lowered by client.
285
     *
286
     * @param ConnectionTune $frame
287
     */
288
    private function onConnectionTune(ConnectionTune $frame)
289
    {
290 19
        $negotiate = function ($a, $b) {
291 19
            return ($a * $b == 0) ? max($a, $b) : min($a, $b);
292 19
        };
293
294 19
        $channelMax = $negotiate($this->url->getQueryParameter('channel_max', 0), $frame->getChannelMax());
295 19
        $frameMax = $negotiate($this->url->getQueryParameter('frame_max', 0), $frame->getFrameMax());
296 19
        $heartbeat = $negotiate($this->url->getQueryParameter('heartbeat', 60), $frame->getHeartbeat());
297
298 19
        $this->send(new ConnectionTuneOk(0, $channelMax, $frameMax, $heartbeat));
299
300 19
        $this->wire->setHeartbeat(new TimeHeartbeat($heartbeat))
301 19
            ->setFrameMax($frameMax);
302 19
    }
303
304
    /**
305
     * This frame is received once server decide to close connection, normally because an unrecoverable error occur.
306
     *
307
     * @param ConnectionClose $frame
308
     *
309
     * @throws AMQPException
310
     */
311 3
    private function onConnectionClose(ConnectionClose $frame)
312
    {
313 3
        $this->send(new ConnectionCloseOk(0));
314 3
        $this->wire->close();
315
316 3
        $this->status = self::STATUS_CLOSED;
317
318 3
        if ($frame->getReplyCode()) {
319 2
            throw AMQPException::make($frame->getReplyText(), $frame->getReplyCode());
320
        }
321 1
    }
322
323
    /**
324
     * This frame is received once server decide to suspend connection, for example because server
325
     * run out of memory and can not provide service for the connection. When this happen consumer
326
     * suppose to suspend all activities until connection.unblocked is received.
327
     *
328
     * @param ConnectionBlocked $frame
329
     */
330 1
    private function onConnectionBlocked(ConnectionBlocked $frame)
331
    {
332 1
        $this->status = self::STATUS_BLOCKED;
333 1
    }
334
335
    /**
336
     * This frame is received once connection returns back to normal state after being suspended.
337
     * See onConnectionBlocked above.
338
     *
339
     * @param ConnectionUnblocked $frame
340
     */
341 1
    private function onConnectionUnblocked(ConnectionUnblocked $frame)
342
    {
343 1
        $this->status = self::STATUS_READY;
344 1
    }
345
}
346