Completed
Push — master ( 4725f7...79150a )
by Sergey
04:25
created

Connection::channel()   B

Complexity

Conditions 6
Paths 9

Size

Total Lines 17
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 11
CRAP Score 6

Importance

Changes 0
Metric Value
dl 0
loc 17
ccs 11
cts 11
cp 1
rs 8.8571
c 0
b 0
f 0
cc 6
eloc 9
nc 9
nop 1
crap 6
1
<?php
2
3
namespace ButterAMQP\AMQP091;
4
5
use ButterAMQP\ConnectionInterface;
6
use ButterAMQP\Exception\AMQPException;
7
use ButterAMQP\Exception\InvalidChannelNumberException;
8
use ButterAMQP\AMQP091\Framing\Frame;
9
use ButterAMQP\AMQP091\Framing\Heartbeat;
10
use ButterAMQP\AMQP091\Framing\Method\ConnectionBlocked;
11
use ButterAMQP\AMQP091\Framing\Method\ConnectionClose;
12
use ButterAMQP\AMQP091\Framing\Method\ConnectionCloseOk;
13
use ButterAMQP\AMQP091\Framing\Method\ConnectionOpen;
14
use ButterAMQP\AMQP091\Framing\Method\ConnectionOpenOk;
15
use ButterAMQP\AMQP091\Framing\Method\ConnectionStart;
16
use ButterAMQP\AMQP091\Framing\Method\ConnectionStartOk;
17
use ButterAMQP\AMQP091\Framing\Method\ConnectionTune;
18
use ButterAMQP\AMQP091\Framing\Method\ConnectionTuneOk;
19
use ButterAMQP\AMQP091\Framing\Method\ConnectionUnblocked;
20
use ButterAMQP\Heartbeat\TimeHeartbeat;
21
use ButterAMQP\Security\Authenticator;
22
use ButterAMQP\Security\AuthenticatorInterface;
23
use ButterAMQP\Url;
24
use ButterAMQP\WireInterface;
25
use ButterAMQP\WireSubscriberInterface;
26
27
class Connection implements ConnectionInterface, WireSubscriberInterface
28
{
29
    const STATUS_CLOSED = 'closed';
30
    const STATUS_READY = 'ready';
31
    const STATUS_BLOCKED = 'blocked';
32
33
    /**
34
     * @var Url
35
     */
36
    private $url;
37
38
    /**
39
     * @var WireInterface
40
     */
41
    private $wire;
42
43
    /**
44
     * @var AuthenticatorInterface
45
     */
46
    private $authenticator;
47
48
    /**
49
     * @var string
50
     */
51
    private $status;
52
53
    /**
54
     * @var Channel[]
55
     */
56
    private $channels = [];
57
58
    /**
59
     * @var array
60
     */
61
    private $capabilities = [];
62
63
    /**
64
     * @param Url|string             $url
65
     * @param WireInterface          $wire
66
     * @param AuthenticatorInterface $authenticator
67
     */
68 31
    public function __construct(Url $url, WireInterface $wire, AuthenticatorInterface $authenticator = null)
69
    {
70 31
        $this->url = $url;
71 31
        $this->wire = $wire;
72 31
        $this->authenticator = $authenticator ?: Authenticator::build();
73 31
    }
74
75
    /**
76
     * Connection status. See STATUS_* constants for possible values.
77
     *
78
     * @return string
79
     */
80 4
    public function getStatus()
81
    {
82 4
        return $this->status;
83
    }
84
85
    /**
86
     * {@inheritdoc}
87
     */
88 19
    public function open()
89
    {
90 19
        $this->channels = [];
91 19
        $this->capabilities = [];
92
93 19
        $this->wire->open($this->url)
94 19
            ->subscribe(0, $this);
95
96 19
        $this->wait(ConnectionTune::class);
97
98 18
        $this->send(new ConnectionOpen(0, $this->url->getVhost(), '', false))
99 18
            ->wait(ConnectionOpenOk::class);
100
101 18
        $this->status = self::STATUS_READY;
102
103 18
        return $this;
104
    }
105
106
    /**
107
     * {@inheritdoc}
108
     */
109 19
    public function channel($id = null)
110
    {
111 19
        if ($id === null) {
112 18
            $id = count($this->channels) == 0 ? 1 : max(array_keys($this->channels)) + 1;
113 18
        }
114
115 19
        if (!is_integer($id) || $id <= 0) {
116 1
            throw new InvalidChannelNumberException('Channel ID should be positive integer');
117
        }
118
119 18
        if (!isset($this->channels[$id])) {
120 18
            $channel = $this->channels[$id] = new Channel($this->wire, $id);
121 18
            $channel->open();
122 18
        }
123
124 18
        return $this->channels[$id];
125
    }
126
127
    /**
128
     * {@inheritdoc}
129
     */
130 7
    public function close($code = 0, $reason = '')
131
    {
132 7
        $this->send(new ConnectionClose(0, $code, $reason, 0, 0))
133 7
            ->wait(ConnectionCloseOk::class);
134
135 7
        $this->status = self::STATUS_CLOSED;
136
137 7
        $this->wire->close();
138
139 7
        return $this;
140
    }
141
142
    /**
143
     * {@inheritdoc}
144
     */
145 1
    public function isSupported($capability)
146
    {
147 1
        return isset($this->capabilities[$capability]) ?
148 1
            (bool) $this->capabilities[$capability] : false;
149
    }
150
151
    /**
152
     * {@inheritdoc}
153
     */
154 10
    public function serve($blocking = true)
155
    {
156 10
        $this->wire->next($blocking);
157
158 8
        return $this;
159
    }
160
161
    /**
162
     * Sends frame to the service channel (#0).
163
     *
164
     * @param Frame $frame
165
     *
166
     * @return $this
167
     */
168 26
    private function send(Frame $frame)
169
    {
170 26
        $this->wire->send($frame);
171
172 26
        return $this;
173
    }
174
175
    /**
176
     * Wait for a frame in the service channel (#0).
177
     *
178
     * @param string|array $type
179
     *
180
     * @return Frame
181
     */
182 20
    private function wait($type)
183
    {
184 20
        return $this->wire->wait(0, $type);
185
    }
186
187
    /**
188
     * {@inheritdoc}
189
     */
190 26
    public function dispatch(Frame $frame)
191
    {
192 26
        if ($frame instanceof ConnectionStart) {
1 ignored issue
show
Bug introduced by
The class ButterAMQP\AMQP091\Framing\Method\ConnectionStart does not exist. Did you forget a USE statement, or did you not list all dependencies?

This error could be the result of:

1. Missing dependencies

PHP Analyzer uses your composer.json file (if available) to determine the dependencies of your project and to determine all the available classes and functions. It expects the composer.json to be in the root folder of your repository.

Are you sure this class is defined by one of your dependencies, or did you maybe not list a dependency in either the require or require-dev section?

2. Missing use statement

PHP does not complain about undefined classes in ìnstanceof checks. For example, the following PHP code will work perfectly fine:

if ($x instanceof DoesNotExist) {
    // Do something.
}

If you have not tested against this specific condition, such errors might go unnoticed.

Loading history...
193 20
            $this->onConnectionStart($frame);
194 26
        } elseif ($frame instanceof ConnectionTune) {
1 ignored issue
show
Bug introduced by
The class ButterAMQP\AMQP091\Framing\Method\ConnectionTune does not exist. Did you forget a USE statement, or did you not list all dependencies?

This error could be the result of:

1. Missing dependencies

PHP Analyzer uses your composer.json file (if available) to determine the dependencies of your project and to determine all the available classes and functions. It expects the composer.json to be in the root folder of your repository.

Are you sure this class is defined by one of your dependencies, or did you maybe not list a dependency in either the require or require-dev section?

2. Missing use statement

PHP does not complain about undefined classes in ìnstanceof checks. For example, the following PHP code will work perfectly fine:

if ($x instanceof DoesNotExist) {
    // Do something.
}

If you have not tested against this specific condition, such errors might go unnoticed.

Loading history...
195 19
            $this->onConnectionTune($frame);
196 24
        } elseif ($frame instanceof ConnectionClose) {
1 ignored issue
show
Bug introduced by
The class ButterAMQP\AMQP091\Framing\Method\ConnectionClose does not exist. Did you forget a USE statement, or did you not list all dependencies?

This error could be the result of:

1. Missing dependencies

PHP Analyzer uses your composer.json file (if available) to determine the dependencies of your project and to determine all the available classes and functions. It expects the composer.json to be in the root folder of your repository.

Are you sure this class is defined by one of your dependencies, or did you maybe not list a dependency in either the require or require-dev section?

2. Missing use statement

PHP does not complain about undefined classes in ìnstanceof checks. For example, the following PHP code will work perfectly fine:

if ($x instanceof DoesNotExist) {
    // Do something.
}

If you have not tested against this specific condition, such errors might go unnoticed.

Loading history...
197 3
            $this->onConnectionClose($frame);
198 20
        } elseif ($frame instanceof ConnectionBlocked) {
1 ignored issue
show
Bug introduced by
The class ButterAMQP\AMQP091\Frami...ethod\ConnectionBlocked does not exist. Did you forget a USE statement, or did you not list all dependencies?

This error could be the result of:

1. Missing dependencies

PHP Analyzer uses your composer.json file (if available) to determine the dependencies of your project and to determine all the available classes and functions. It expects the composer.json to be in the root folder of your repository.

Are you sure this class is defined by one of your dependencies, or did you maybe not list a dependency in either the require or require-dev section?

2. Missing use statement

PHP does not complain about undefined classes in ìnstanceof checks. For example, the following PHP code will work perfectly fine:

if ($x instanceof DoesNotExist) {
    // Do something.
}

If you have not tested against this specific condition, such errors might go unnoticed.

Loading history...
199 1
            $this->onConnectionBlocked($frame);
200 19
        } elseif ($frame instanceof ConnectionUnblocked) {
1 ignored issue
show
Bug introduced by
The class ButterAMQP\AMQP091\Frami...hod\ConnectionUnblocked does not exist. Did you forget a USE statement, or did you not list all dependencies?

This error could be the result of:

1. Missing dependencies

PHP Analyzer uses your composer.json file (if available) to determine the dependencies of your project and to determine all the available classes and functions. It expects the composer.json to be in the root folder of your repository.

Are you sure this class is defined by one of your dependencies, or did you maybe not list a dependency in either the require or require-dev section?

2. Missing use statement

PHP does not complain about undefined classes in ìnstanceof checks. For example, the following PHP code will work perfectly fine:

if ($x instanceof DoesNotExist) {
    // Do something.
}

If you have not tested against this specific condition, such errors might go unnoticed.

Loading history...
201 1
            $this->onConnectionUnblocked($frame);
202 1
        }
203 25
    }
204
205
    /**
206
     * This frame is the first frame received from server.
207
     * It provides server details and requests client credentials.
208
     *
209
     * @param ConnectionStart $frame
210
     */
211 20
    private function onConnectionStart(ConnectionStart $frame)
212
    {
213 20
        $properties = $frame->getServerProperties();
214
215 20
        $this->capabilities = isset($properties['capabilities']) ?
216 20
            $properties['capabilities'] : [];
217
218 20
        $mechanism = $this->authenticator
219 20
            ->get(explode(' ', $frame->getMechanisms()));
220
221 20
        list($locale) = explode(' ', $frame->getLocales());
222
223 20
        $this->send(new ConnectionStartOk(
224 20
            0,
225
            [
226 20
                'platform' => 'PHP '.PHP_VERSION,
227 20
                'product' => 'ButterAMQP',
228 20
                'version' => '0.1.0',
229
                'capabilities' => [
230 20
                    'publisher_confirms' => true,
231 20
                    'exchange_exchange_bindings' => true,
232 20
                    'basic.nack' => true,
233 20
                    'connection.blocked' => true,
234 20
                    'consumer_cancel_notify' => true,
235 20
                    'authentication_failure_close' => true,
236 20
                ],
237 20
            ],
238 20
            $mechanism->getName(),
239 20
            $mechanism->getResponse($this->url->getUser(), $this->url->getPass()),
240
            $locale
241 20
        ));
242 20
    }
243
244
    /**
245
     * This frame is received to setup connection preferences, like max frame size,
246
     * max number of channel and heartbeat delay.
247
     *
248
     * Values in the request can be lowered by client.
249
     *
250
     * @param ConnectionTune $frame
251
     */
252
    private function onConnectionTune(ConnectionTune $frame)
253
    {
254 19
        $negotiate = function ($a, $b) {
255 19
            return ($a * $b == 0) ? max($a, $b) : min($a, $b);
256 19
        };
257
258 19
        $channelMax = $negotiate($this->url->getQueryParameter('channel_max', 0), $frame->getChannelMax());
259 19
        $frameMax = $negotiate($this->url->getQueryParameter('frame_max', 0), $frame->getFrameMax());
260 19
        $heartbeat = $negotiate($this->url->getQueryParameter('heartbeat', 60), $frame->getHeartbeat());
261
262 19
        $this->send(new ConnectionTuneOk(0, $channelMax, $frameMax, $heartbeat));
263
264 19
        $this->wire->setHeartbeat(new TimeHeartbeat($heartbeat))
265 19
            ->setFrameMax($frameMax);
266 19
    }
267
268
    /**
269
     * This frame is received once server decide to close connection, normally because an unrecoverable error occur.
270
     *
271
     * @param ConnectionClose $frame
272
     *
273
     * @throws AMQPException
274
     */
275 3
    private function onConnectionClose(ConnectionClose $frame)
276
    {
277 3
        $this->send(new ConnectionCloseOk(0));
278 3
        $this->wire->close();
279
280 3
        $this->status = self::STATUS_CLOSED;
281
282 3
        if ($frame->getReplyCode()) {
283 2
            throw AMQPException::make($frame->getReplyText(), $frame->getReplyCode());
284
        }
285 1
    }
286
287
    /**
288
     * This frame is received once server decide to suspend connection, for example because server
289
     * run out of memory and can not provide service for the connection. When this happen consumer
290
     * suppose to suspend all activities until connection.unblocked is received.
291
     *
292
     * @param ConnectionBlocked $frame
293
     */
294 1
    private function onConnectionBlocked(ConnectionBlocked $frame)
1 ignored issue
show
Unused Code introduced by
The parameter $frame is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
295
    {
296 1
        $this->status = self::STATUS_BLOCKED;
297 1
    }
298
299
    /**
300
     * This frame is received once connection returns back to normal state after being suspended.
301
     * See onConnectionBlocked above.
302
     *
303
     * @param ConnectionUnblocked $frame
304
     */
305 1
    private function onConnectionUnblocked(ConnectionUnblocked $frame)
1 ignored issue
show
Unused Code introduced by
The parameter $frame is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
306
    {
307 1
        $this->status = self::STATUS_READY;
308 1
    }
309
}
310