Completed
Push — master ( 2528a8...9eba40 )
by Sergey
03:58
created

Connection::allocateChannelNumber()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 2

Importance

Changes 1
Bugs 0 Features 0
Metric Value
dl 0
loc 4
ccs 2
cts 2
cp 1
rs 10
c 1
b 0
f 0
cc 2
eloc 2
nc 2
nop 0
crap 2
1
<?php
2
3
namespace ButterAMQP\AMQP091;
4
5
use ButterAMQP\ConnectionInterface;
6
use ButterAMQP\Exception\AMQPException;
7
use ButterAMQP\Exception\InvalidChannelNumberException;
8
use ButterAMQP\AMQP091\Framing\Frame;
9
use ButterAMQP\AMQP091\Framing\Heartbeat;
10
use ButterAMQP\AMQP091\Framing\Method\ConnectionBlocked;
11
use ButterAMQP\AMQP091\Framing\Method\ConnectionClose;
12
use ButterAMQP\AMQP091\Framing\Method\ConnectionCloseOk;
13
use ButterAMQP\AMQP091\Framing\Method\ConnectionOpen;
14
use ButterAMQP\AMQP091\Framing\Method\ConnectionOpenOk;
15
use ButterAMQP\AMQP091\Framing\Method\ConnectionStart;
16
use ButterAMQP\AMQP091\Framing\Method\ConnectionStartOk;
17
use ButterAMQP\AMQP091\Framing\Method\ConnectionTune;
18
use ButterAMQP\AMQP091\Framing\Method\ConnectionTuneOk;
19
use ButterAMQP\AMQP091\Framing\Method\ConnectionUnblocked;
20
use ButterAMQP\Heartbeat\TimeHeartbeat;
21
use ButterAMQP\Security\Authenticator;
22
use ButterAMQP\Security\AuthenticatorInterface;
23
use ButterAMQP\Url;
24
25
class Connection implements ConnectionInterface, WireSubscriberInterface
26
{
27
    const STATUS_CLOSED = 'closed';
28
    const STATUS_READY = 'ready';
29
    const STATUS_BLOCKED = 'blocked';
30
31
    /**
32
     * @var Url
33
     */
34
    private $url;
35
36
    /**
37
     * @var WireInterface
38
     */
39
    private $wire;
40
41
    /**
42
     * @var AuthenticatorInterface
43
     */
44
    private $authenticator;
45
46
    /**
47
     * @var string
48
     */
49
    private $status;
50
51
    /**
52
     * @var Channel[]
53
     */
54
    private $channels = [];
55
56
    /**
57
     * @var array
58
     */
59
    private $capabilities = [];
60
61
    /**
62
     * @param Url|string             $url
63
     * @param WireInterface          $wire
64
     * @param AuthenticatorInterface $authenticator
65
     */
66 31
    public function __construct(Url $url, WireInterface $wire, AuthenticatorInterface $authenticator = null)
67
    {
68 31
        $this->url = $url;
69 31
        $this->wire = $wire;
70 31
        $this->authenticator = $authenticator ?: Authenticator::build();
71 31
    }
72
73
    /**
74
     * Connection status. See STATUS_* constants for possible values.
75
     *
76
     * @return string
77
     */
78 4
    public function getStatus()
79
    {
80 4
        return $this->status;
81
    }
82
83
    /**
84
     * {@inheritdoc}
85
     */
86 19
    public function open()
87
    {
88 19
        $this->channels = [];
89 19
        $this->capabilities = [];
90
91 19
        $this->wire->open($this->url)
92 19
            ->subscribe(0, $this);
93
94 19
        $this->wait(ConnectionTune::class);
95
96 18
        $this->send(new ConnectionOpen(0, $this->url->getVhost(), '', false))
97 18
            ->wait(ConnectionOpenOk::class);
98
99 18
        $this->status = self::STATUS_READY;
100
101 18
        return $this;
102
    }
103
104
    /**
105
     * {@inheritdoc}
106
     */
107 19
    public function channel($id = null)
108
    {
109 19
        if ($id === null) {
110 18
            $id = $this->allocateChannelNumber();
111 18
        }
112
113 19
        if ($this->isChannelNumberValid($id)) {
114 1
            throw new InvalidChannelNumberException('Channel ID should be positive integer');
115
        }
116
117 18
        if (!isset($this->channels[$id])) {
118 18
            $this->channels[$id] = $this->openChannel($id);
119 18
        }
120
121 18
        return $this->channels[$id];
122
    }
123
124
    /**
125
     * @return int
126
     */
127 18
    private function allocateChannelNumber()
128
    {
129 18
        return count($this->channels) == 0 ? 1 : max(array_keys($this->channels)) + 1;
130
    }
131
132
    /**
133
     * @param int $id
134
     *
135
     * @return bool
136
     */
137 19
    private function isChannelNumberValid($id)
138
    {
139 19
        return !is_integer($id) || $id <= 0;
140
    }
141
142
    /**
143
     * @param int $id
144
     *
145
     * @return Channel
146
     */
147 18
    private function openChannel($id)
148
    {
149 18
        $channel = new Channel($this->wire, $id);
150 18
        $channel->open();
151
152 18
        return $channel;
153
    }
154
155
    /**
156
     * {@inheritdoc}
157
     */
158 7
    public function close($code = 0, $reason = '')
159
    {
160 7
        $this->send(new ConnectionClose(0, $code, $reason, 0, 0))
161 7
            ->wait(ConnectionCloseOk::class);
162
163 7
        $this->status = self::STATUS_CLOSED;
164
165 7
        $this->wire->close();
166
167 7
        return $this;
168
    }
169
170
    /**
171
     * {@inheritdoc}
172
     */
173 1
    public function isSupported($capability)
174
    {
175 1
        return isset($this->capabilities[$capability]) ?
176 1
            (bool) $this->capabilities[$capability] : false;
177
    }
178
179
    /**
180
     * {@inheritdoc}
181
     */
182 10
    public function serve($blocking = true)
183
    {
184 10
        $this->wire->next($blocking);
185
186 8
        return $this;
187
    }
188
189
    /**
190
     * Sends frame to the service channel (#0).
191
     *
192
     * @param Frame $frame
193
     *
194
     * @return $this
195
     */
196 26
    private function send(Frame $frame)
197
    {
198 26
        $this->wire->send($frame);
199
200 26
        return $this;
201
    }
202
203
    /**
204
     * Wait for a frame in the service channel (#0).
205
     *
206
     * @param string|array $type
207
     *
208
     * @return Frame
209
     */
210 20
    private function wait($type)
211
    {
212 20
        return $this->wire->wait(0, $type);
213
    }
214
215
    /**
216
     * {@inheritdoc}
217
     */
218 26
    public function dispatch(Frame $frame)
219
    {
220 26
        if ($frame instanceof ConnectionStart) {
221 20
            $this->onConnectionStart($frame);
222 26
        } elseif ($frame instanceof ConnectionTune) {
223 19
            $this->onConnectionTune($frame);
224 24
        } elseif ($frame instanceof ConnectionClose) {
225 3
            $this->onConnectionClose($frame);
226 20
        } elseif ($frame instanceof ConnectionBlocked) {
227 1
            $this->onConnectionBlocked($frame);
228 19
        } elseif ($frame instanceof ConnectionUnblocked) {
229 1
            $this->onConnectionUnblocked($frame);
230 1
        }
231 25
    }
232
233
    /**
234
     * This frame is the first frame received from server.
235
     * It provides server details and requests client credentials.
236
     *
237
     * @param ConnectionStart $frame
238
     */
239 20
    private function onConnectionStart(ConnectionStart $frame)
240
    {
241 20
        $properties = $frame->getServerProperties();
242
243 20
        $this->capabilities = isset($properties['capabilities']) ?
244 20
            $properties['capabilities'] : [];
245
246 20
        $mechanism = $this->authenticator
247 20
            ->get(explode(' ', $frame->getMechanisms()));
248
249 20
        list($locale) = explode(' ', $frame->getLocales());
250
251 20
        $this->send(new ConnectionStartOk(
252 20
            0,
253
            [
254 20
                'platform' => 'PHP '.PHP_VERSION,
255 20
                'product' => 'ButterAMQP',
256 20
                'version' => '0.1.0',
257
                'capabilities' => [
258 20
                    'publisher_confirms' => true,
259 20
                    'exchange_exchange_bindings' => true,
260 20
                    'basic.nack' => true,
261 20
                    'connection.blocked' => true,
262 20
                    'consumer_cancel_notify' => true,
263 20
                    'authentication_failure_close' => true,
264 20
                ],
265 20
            ],
266 20
            $mechanism->getName(),
267 20
            $mechanism->getResponse($this->url->getUser(), $this->url->getPass()),
268
            $locale
269 20
        ));
270 20
    }
271
272
    /**
273
     * This frame is received to setup connection preferences, like max frame size,
274
     * max number of channel and heartbeat delay.
275
     *
276
     * Values in the request can be lowered by client.
277
     *
278
     * @param ConnectionTune $frame
279
     */
280
    private function onConnectionTune(ConnectionTune $frame)
281
    {
282 19
        $negotiate = function ($a, $b) {
283 19
            return ($a * $b == 0) ? max($a, $b) : min($a, $b);
284 19
        };
285
286 19
        $channelMax = $negotiate($this->url->getQueryParameter('channel_max', 0), $frame->getChannelMax());
287 19
        $frameMax = $negotiate($this->url->getQueryParameter('frame_max', 0), $frame->getFrameMax());
288 19
        $heartbeat = $negotiate($this->url->getQueryParameter('heartbeat', 60), $frame->getHeartbeat());
289
290 19
        $this->send(new ConnectionTuneOk(0, $channelMax, $frameMax, $heartbeat));
291
292 19
        $this->wire->setHeartbeat(new TimeHeartbeat($heartbeat))
293 19
            ->setFrameMax($frameMax);
294 19
    }
295
296
    /**
297
     * This frame is received once server decide to close connection, normally because an unrecoverable error occur.
298
     *
299
     * @param ConnectionClose $frame
300
     *
301
     * @throws AMQPException
302
     */
303 3
    private function onConnectionClose(ConnectionClose $frame)
304
    {
305 3
        $this->send(new ConnectionCloseOk(0));
306 3
        $this->wire->close();
307
308 3
        $this->status = self::STATUS_CLOSED;
309
310 3
        if ($frame->getReplyCode()) {
311 2
            throw AMQPException::make($frame->getReplyText(), $frame->getReplyCode());
312
        }
313 1
    }
314
315
    /**
316
     * This frame is received once server decide to suspend connection, for example because server
317
     * run out of memory and can not provide service for the connection. When this happen consumer
318
     * suppose to suspend all activities until connection.unblocked is received.
319
     *
320
     * @param ConnectionBlocked $frame
321
     */
322 1
    private function onConnectionBlocked(ConnectionBlocked $frame)
323
    {
324 1
        $this->status = self::STATUS_BLOCKED;
325 1
    }
326
327
    /**
328
     * This frame is received once connection returns back to normal state after being suspended.
329
     * See onConnectionBlocked above.
330
     *
331
     * @param ConnectionUnblocked $frame
332
     */
333 1
    private function onConnectionUnblocked(ConnectionUnblocked $frame)
334
    {
335 1
        $this->status = self::STATUS_READY;
336 1
    }
337
}
338