Connection   B
last analyzed

Complexity

Total Complexity 36

Size/Duplication

Total Lines 335
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 8

Test Coverage

Coverage 100%

Importance

Changes 0
Metric Value
wmc 36
lcom 1
cbo 8
dl 0
loc 335
ccs 121
cts 121
cp 1
rs 8.8
c 0
b 0
f 0

20 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 6 2
A getStatus() 0 4 1
A open() 0 17 1
A channel() 0 16 4
A allocateChannelNumber() 0 4 2
A isChannelNumberValid() 0 4 2
A openChannel() 0 9 1
A close() 0 11 1
A isSupported() 0 7 2
A serve() 0 6 1
A send() 0 6 1
A wait() 0 4 1
B dispatch() 0 14 6
A onConnectionStart() 0 20 2
A getClientProperties() 0 16 1
A onConnectionTune() 0 15 2
A onConnectionClose() 0 11 2
A onConnectionBlocked() 0 4 1
A onConnectionUnblocked() 0 4 1
A openIfClosed() 0 6 2
1
<?php
2
3
namespace ButterAMQP\AMQP091;
4
5
use ButterAMQP\ConnectionInterface;
6
use ButterAMQP\Exception\AMQPException;
7
use ButterAMQP\Exception\InvalidChannelNumberException;
8
use ButterAMQP\AMQP091\Framing\Frame;
9
use ButterAMQP\AMQP091\Framing\Heartbeat;
10
use ButterAMQP\AMQP091\Framing\Method\ConnectionBlocked;
11
use ButterAMQP\AMQP091\Framing\Method\ConnectionClose;
12
use ButterAMQP\AMQP091\Framing\Method\ConnectionCloseOk;
13
use ButterAMQP\AMQP091\Framing\Method\ConnectionOpen;
14
use ButterAMQP\AMQP091\Framing\Method\ConnectionOpenOk;
15
use ButterAMQP\AMQP091\Framing\Method\ConnectionStart;
16
use ButterAMQP\AMQP091\Framing\Method\ConnectionStartOk;
17
use ButterAMQP\AMQP091\Framing\Method\ConnectionTune;
18
use ButterAMQP\AMQP091\Framing\Method\ConnectionTuneOk;
19
use ButterAMQP\AMQP091\Framing\Method\ConnectionUnblocked;
20
use ButterAMQP\Heartbeat\TimeHeartbeat;
21
use ButterAMQP\Security\Authenticator;
22
use ButterAMQP\Security\AuthenticatorInterface;
23
use ButterAMQP\Url;
24
25
class Connection implements ConnectionInterface, WireSubscriberInterface
26
{
27
    const STATUS_CLOSED = 'closed';
28
    const STATUS_READY = 'ready';
29
    const STATUS_BLOCKED = 'blocked';
30
31
    /**
32
     * @var Url
33
     */
34
    private $url;
35
36
    /**
37
     * @var WireInterface
38
     */
39
    private $wire;
40
41
    /**
42
     * @var AuthenticatorInterface
43
     */
44
    private $authenticator;
45
46
    /**
47
     * @var string
48
     */
49
    private $status;
50
51
    /**
52
     * @var Channel[]
53
     */
54
    private $channels = [];
55
56
    /**
57
     * @var array
58
     */
59
    private $capabilities = [];
60
61
    /**
62
     * @param Url|string             $url
63
     * @param WireInterface          $wire
64
     * @param AuthenticatorInterface $authenticator
65
     */
66 31
    public function __construct(Url $url, WireInterface $wire, AuthenticatorInterface $authenticator = null)
67
    {
68 31
        $this->url = $url;
69 31
        $this->wire = $wire;
70 31
        $this->authenticator = $authenticator ?: Authenticator::build();
71 31
    }
72
73
    /**
74
     * Connection status. See STATUS_* constants for possible values.
75
     *
76
     * @return string
77
     */
78 4
    public function getStatus()
79
    {
80 4
        return $this->status;
81
    }
82
83
    /**
84
     * {@inheritdoc}
85
     */
86 21
    public function open()
87
    {
88 21
        $this->channels = [];
89 21
        $this->capabilities = [];
90
91 21
        $this->wire->open($this->url)
92 21
            ->subscribe(0, $this);
93
94 21
        $this->wait(ConnectionTune::class);
95
96 20
        $this->send(new ConnectionOpen(0, $this->url->getVhost(), '', false))
97 20
            ->wait(ConnectionOpenOk::class);
98
99 20
        $this->status = self::STATUS_READY;
100
101 20
        return $this;
102
    }
103
104
    /**
105
     * {@inheritdoc}
106
     */
107 19
    public function channel($id = null)
108
    {
109 19
        if ($id === null) {
110 18
            $id = $this->allocateChannelNumber();
111 18
        }
112
113 19
        if (!$this->isChannelNumberValid($id)) {
114 1
            throw new InvalidChannelNumberException('Channel ID should be positive integer');
115
        }
116
117 18
        if (!isset($this->channels[$id])) {
118 18
            $this->channels[$id] = $this->openChannel($id);
119 18
        }
120
121 18
        return $this->channels[$id];
122
    }
123
124
    /**
125
     * @return int
126
     */
127 18
    private function allocateChannelNumber()
128
    {
129 18
        return count($this->channels) == 0 ? 1 : max(array_keys($this->channels)) + 1;
130
    }
131
132
    /**
133
     * @param int $id
134
     *
135
     * @return bool
136
     */
137 19
    private function isChannelNumberValid($id)
138
    {
139 19
        return is_integer($id) && $id > 0;
140
    }
141
142
    /**
143
     * @param int $id
144
     *
145
     * @return Channel
146
     */
147 18
    private function openChannel($id)
148
    {
149 18
        $this->openIfClosed();
150
151 18
        $channel = new Channel($this->wire, $id);
152 18
        $channel->open();
153
154 18
        return $channel;
155
    }
156
157
    /**
158
     * {@inheritdoc}
159
     */
160 18
    public function close($code = 0, $reason = '')
161
    {
162 18
        $this->send(new ConnectionClose(0, $code, $reason, 0, 0))
163 18
            ->wait(ConnectionCloseOk::class);
164
165 18
        $this->status = self::STATUS_CLOSED;
166
167 18
        $this->wire->close();
168
169 18
        return $this;
170
    }
171
172
    /**
173
     * {@inheritdoc}
174
     */
175 1
    public function isSupported($capability)
176
    {
177 1
        $this->openIfClosed();
178
179 1
        return isset($this->capabilities[$capability]) ?
180 1
            (bool) $this->capabilities[$capability] : false;
181
    }
182
183
    /**
184
     * {@inheritdoc}
185
     */
186 10
    public function serve($blocking = true)
187
    {
188 10
        $this->wire->next($blocking);
189
190 10
        return $this;
191
    }
192
193
    /**
194
     * Sends frame to the service channel (#0).
195
     *
196
     * @param Frame $frame
197
     *
198
     * @return $this
199
     */
200 27
    private function send(Frame $frame)
201
    {
202 27
        $this->wire->send($frame);
203
204 27
        return $this;
205
    }
206
207
    /**
208
     * Wait for a frame in the service channel (#0).
209
     *
210
     * @param string|array $type
211
     *
212
     * @return Frame
213
     */
214 22
    private function wait($type)
215
    {
216 22
        return $this->wire->wait(0, $type);
217
    }
218
219
    /**
220
     * {@inheritdoc}
221
     */
222 26
    public function dispatch(Frame $frame)
223
    {
224 26
        if ($frame instanceof ConnectionStart) {
225 20
            $this->onConnectionStart($frame);
226 26
        } elseif ($frame instanceof ConnectionTune) {
227 19
            $this->onConnectionTune($frame);
228 24
        } elseif ($frame instanceof ConnectionClose) {
229 3
            $this->onConnectionClose($frame);
230 20
        } elseif ($frame instanceof ConnectionBlocked) {
231 1
            $this->onConnectionBlocked($frame);
232 19
        } elseif ($frame instanceof ConnectionUnblocked) {
233 1
            $this->onConnectionUnblocked($frame);
234 1
        }
235 25
    }
236
237
    /**
238
     * This frame is the first frame received from server.
239
     * It provides server details and requests client credentials.
240
     *
241
     * @param ConnectionStart $frame
242
     */
243 20
    private function onConnectionStart(ConnectionStart $frame)
244
    {
245 20
        $properties = $frame->getServerProperties();
246
247 20
        $this->capabilities = isset($properties['capabilities']) ?
248 20
            $properties['capabilities'] : [];
249
250 20
        $mechanism = $this->authenticator
251 20
            ->get(explode(' ', $frame->getMechanisms()));
252
253 20
        list($locale) = explode(' ', $frame->getLocales());
254
255 20
        $this->send(new ConnectionStartOk(
256 20
            0,
257 20
            $this->getClientProperties(),
258 20
            $mechanism->getName(),
259 20
            $mechanism->getResponse($this->url->getUser(), $this->url->getPassword()),
260
            $locale
261 20
        ));
262 20
    }
263
264
    /**
265
     * @return array
266
     */
267 20
    private function getClientProperties()
268
    {
269
        return [
270 20
            'platform' => sprintf('PHP %d.%d', PHP_MAJOR_VERSION, PHP_MINOR_VERSION),
271 20
            'product' => 'ButterAMQP',
272 20
            'version' => '0.1.0',
273
            'capabilities' => [
274 20
                'publisher_confirms' => true,
275 20
                'exchange_exchange_bindings' => true,
276 20
                'basic.nack' => true,
277 20
                'connection.blocked' => true,
278 20
                'consumer_cancel_notify' => true,
279 20
                'authentication_failure_close' => true,
280 20
            ],
281 20
        ];
282
    }
283
284
    /**
285
     * This frame is received to setup connection preferences, like max frame size,
286
     * max number of channel and heartbeat delay.
287
     *
288
     * Values in the request can be lowered by client.
289
     *
290
     * @param ConnectionTune $frame
291
     */
292
    private function onConnectionTune(ConnectionTune $frame)
293
    {
294 19
        $negotiate = function ($a, $b) {
295 19
            return ($a * $b == 0) ? max($a, $b) : min($a, $b);
296 19
        };
297
298 19
        $channelMax = $negotiate($this->url->getQueryParameter('channel_max', 0), $frame->getChannelMax());
299 19
        $frameMax = $negotiate($this->url->getQueryParameter('frame_max', 0), $frame->getFrameMax());
300 19
        $heartbeat = $negotiate($this->url->getQueryParameter('heartbeat', 60), $frame->getHeartbeat());
301
302 19
        $this->send(new ConnectionTuneOk(0, $channelMax, $frameMax, $heartbeat));
303
304 19
        $this->wire->setHeartbeat(new TimeHeartbeat($heartbeat))
305 19
            ->setFrameMax($frameMax);
306 19
    }
307
308
    /**
309
     * This frame is received once server decide to close connection, normally because an unrecoverable error occur.
310
     *
311
     * @param ConnectionClose $frame
312
     *
313
     * @throws AMQPException
314
     */
315 3
    private function onConnectionClose(ConnectionClose $frame)
316
    {
317 3
        $this->send(new ConnectionCloseOk(0));
318 3
        $this->wire->close();
319
320 3
        $this->status = self::STATUS_CLOSED;
321
322 3
        if ($frame->getReplyCode()) {
323 2
            throw AMQPException::make($frame->getReplyText(), $frame->getReplyCode());
324
        }
325 1
    }
326
327
    /**
328
     * This frame is received once server decide to suspend connection, for example because server
329
     * run out of memory and can not provide service for the connection. When this happen consumer
330
     * suppose to suspend all activities until connection.unblocked is received.
331
     *
332
     * @param ConnectionBlocked $frame
333
     */
334 1
    private function onConnectionBlocked(ConnectionBlocked $frame)
0 ignored issues
show
Unused Code introduced by
The parameter $frame is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
335
    {
336 1
        $this->status = self::STATUS_BLOCKED;
337 1
    }
338
339
    /**
340
     * This frame is received once connection returns back to normal state after being suspended.
341
     * See onConnectionBlocked above.
342
     *
343
     * @param ConnectionUnblocked $frame
344
     */
345 1
    private function onConnectionUnblocked(ConnectionUnblocked $frame)
0 ignored issues
show
Unused Code introduced by
The parameter $frame is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
346
    {
347 1
        $this->status = self::STATUS_READY;
348 1
    }
349
350
    /**
351
     * Opens connection if closed.
352
     */
353 19
    private function openIfClosed()
354
    {
355 19
        if ($this->status !== self::STATUS_READY) {
356 1
            $this->open();
357 1
        }
358 19
    }
359
}
360