Completed
Push — master ( e17409...9d6c1e )
by Sergey
04:06
created

Connection::__construct()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 7
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 7
ccs 6
cts 6
cp 1
rs 9.4285
c 0
b 0
f 0
cc 2
eloc 5
nc 2
nop 3
crap 2
1
<?php
2
3
namespace ButterAMQP\AMQP091;
4
5
use ButterAMQP\ConnectionInterface;
6
use ButterAMQP\Exception\AMQPException;
7
use ButterAMQP\Exception\InvalidChannelNumberException;
8
use ButterAMQP\AMQP091\Framing\Frame;
9
use ButterAMQP\AMQP091\Framing\Heartbeat;
10
use ButterAMQP\AMQP091\Framing\Method\ConnectionBlocked;
11
use ButterAMQP\AMQP091\Framing\Method\ConnectionClose;
12
use ButterAMQP\AMQP091\Framing\Method\ConnectionCloseOk;
13
use ButterAMQP\AMQP091\Framing\Method\ConnectionOpen;
14
use ButterAMQP\AMQP091\Framing\Method\ConnectionOpenOk;
15
use ButterAMQP\AMQP091\Framing\Method\ConnectionStart;
16
use ButterAMQP\AMQP091\Framing\Method\ConnectionStartOk;
17
use ButterAMQP\AMQP091\Framing\Method\ConnectionTune;
18
use ButterAMQP\AMQP091\Framing\Method\ConnectionTuneOk;
19
use ButterAMQP\AMQP091\Framing\Method\ConnectionUnblocked;
20
use ButterAMQP\Heartbeat\TimeHeartbeat;
21
use ButterAMQP\Security\Authenticator;
22
use ButterAMQP\Security\AuthenticatorInterface;
23
use ButterAMQP\Url;
24
use ButterAMQP\WireInterface;
25
use ButterAMQP\WireSubscriberInterface;
26
use Psr\Log\LoggerAwareInterface;
27
use Psr\Log\LoggerAwareTrait;
28
use Psr\Log\NullLogger;
29
30
class Connection implements ConnectionInterface, WireSubscriberInterface, LoggerAwareInterface
31
{
32
    use LoggerAwareTrait;
33
34
    const STATUS_CLOSED = 0;
35
    const STATUS_READY = 1;
36
    const STATUS_BLOCKED = 2;
37
38
    /**
39
     * @var Url
40
     */
41
    private $url;
42
43
    /**
44
     * @var WireInterface
45
     */
46
    private $wire;
47
48
    /**
49
     * @var AuthenticatorInterface
50
     */
51
    private $authenticator;
52
53
    /**
54
     * @var string
55
     */
56
    private $status;
57
58
    /**
59
     * @var Channel[]
60
     */
61
    private $channels = [];
62
63
    /**
64
     * @var array
65
     */
66
    private $capabilities = [];
67
68
    /**
69
     * @param Url|string             $url
70
     * @param WireInterface          $wire
71
     * @param AuthenticatorInterface $authenticator
72
     */
73 31
    public function __construct(Url $url, WireInterface $wire, AuthenticatorInterface $authenticator = null)
74
    {
75 31
        $this->url = $url;
76 31
        $this->wire = $wire;
77 31
        $this->authenticator = $authenticator ?: Authenticator::build();
78 31
        $this->logger = new NullLogger();
79 31
    }
80
81
    /**
82
     * Connection status. See STATUS_* constants for possible values.
83
     *
84
     * @return string
85
     */
86 4
    public function getStatus()
87
    {
88 4
        return $this->status;
89
    }
90
91
    /**
92
     * {@inheritdoc}
93
     */
94 19
    public function open()
95
    {
96 19
        $this->channels = [];
97 19
        $this->capabilities = [];
98
99 19
        $this->wire->open($this->url)
100 19
            ->subscribe(0, $this);
101
102 19
        $this->wait(ConnectionTune::class);
103
104 18
        $this->send(new ConnectionOpen(0, $this->url->getVhost(), '', false))
105 18
            ->wait(ConnectionOpenOk::class);
106
107 18
        $this->status = self::STATUS_READY;
0 ignored issues
show
Documentation Bug introduced by
The property $status was declared of type string, but self::STATUS_READY is of type integer. Maybe add a type cast?

This check looks for assignments to scalar types that may be of the wrong type.

To ensure the code behaves as expected, it may be a good idea to add an explicit type cast.

$answer = 42;

$correct = false;

$correct = (bool) $answer;
Loading history...
108
109 18
        return $this;
110
    }
111
112
    /**
113
     * {@inheritdoc}
114
     */
115 19
    public function channel($id = null)
116
    {
117 19
        if ($id === null) {
118 18
            $id = count($this->channels) == 0 ? 1 : max(array_keys($this->channels)) + 1;
119 18
        }
120
121 19
        if (!is_integer($id) || $id <= 0) {
122 1
            throw new InvalidChannelNumberException('Channel ID should be positive integer');
123
        }
124
125 18
        if (!isset($this->channels[$id])) {
126 18
            $channel = new Channel($this->wire, $id);
127
128 18
            if ($channel instanceof LoggerAwareInterface) {
129 18
                $channel->setLogger($this->logger);
130 18
            }
131
132 18
            $this->channels[$id] = $channel;
133
134 18
            $channel->open();
135 18
        }
136
137 18
        return $this->channels[$id];
138
    }
139
140
    /**
141
     * {@inheritdoc}
142
     */
143 7
    public function close($code = 0, $reason = '')
144
    {
145 7
        $this->send(new ConnectionClose(0, $code, $reason, 0, 0))
146 7
            ->wait(ConnectionCloseOk::class);
147
148 7
        $this->status = self::STATUS_CLOSED;
0 ignored issues
show
Documentation Bug introduced by
The property $status was declared of type string, but self::STATUS_CLOSED is of type integer. Maybe add a type cast?

This check looks for assignments to scalar types that may be of the wrong type.

To ensure the code behaves as expected, it may be a good idea to add an explicit type cast.

$answer = 42;

$correct = false;

$correct = (bool) $answer;
Loading history...
149
150 7
        $this->wire->close();
151
152 7
        return $this;
153
    }
154
155
    /**
156
     * {@inheritdoc}
157
     */
158 1
    public function isSupported($capability)
159
    {
160 1
        return isset($this->capabilities[$capability]) ?
161 1
            (bool) $this->capabilities[$capability] : false;
162
    }
163
164
    /**
165
     * {@inheritdoc}
166
     */
167 10
    public function serve($blocking = true)
168
    {
169 10
        $this->wire->next($blocking);
170
171 8
        return $this;
172
    }
173
174
    /**
175
     * Sends frame to the service channel (#0).
176
     *
177
     * @param Frame $frame
178
     *
179
     * @return $this
180
     */
181 26
    private function send(Frame $frame)
182
    {
183 26
        $this->wire->send($frame);
184
185 26
        return $this;
186
    }
187
188
    /**
189
     * Wait for a frame in the service channel (#0).
190
     *
191
     * @param string|array $type
192
     *
193
     * @return Frame
194
     */
195 20
    private function wait($type)
196
    {
197 20
        return $this->wire->wait(0, $type);
198
    }
199
200
    /**
201
     * {@inheritdoc}
202
     */
203 26
    public function dispatch(Frame $frame)
204
    {
205 26
        if ($frame instanceof ConnectionStart) {
1 ignored issue
show
Bug introduced by
The class ButterAMQP\AMQP091\Framing\Method\ConnectionStart does not exist. Did you forget a USE statement, or did you not list all dependencies?

This error could be the result of:

1. Missing dependencies

PHP Analyzer uses your composer.json file (if available) to determine the dependencies of your project and to determine all the available classes and functions. It expects the composer.json to be in the root folder of your repository.

Are you sure this class is defined by one of your dependencies, or did you maybe not list a dependency in either the require or require-dev section?

2. Missing use statement

PHP does not complain about undefined classes in ìnstanceof checks. For example, the following PHP code will work perfectly fine:

if ($x instanceof DoesNotExist) {
    // Do something.
}

If you have not tested against this specific condition, such errors might go unnoticed.

Loading history...
206 20
            $this->onConnectionStart($frame);
207 26
        } elseif ($frame instanceof ConnectionTune) {
1 ignored issue
show
Bug introduced by
The class ButterAMQP\AMQP091\Framing\Method\ConnectionTune does not exist. Did you forget a USE statement, or did you not list all dependencies?

This error could be the result of:

1. Missing dependencies

PHP Analyzer uses your composer.json file (if available) to determine the dependencies of your project and to determine all the available classes and functions. It expects the composer.json to be in the root folder of your repository.

Are you sure this class is defined by one of your dependencies, or did you maybe not list a dependency in either the require or require-dev section?

2. Missing use statement

PHP does not complain about undefined classes in ìnstanceof checks. For example, the following PHP code will work perfectly fine:

if ($x instanceof DoesNotExist) {
    // Do something.
}

If you have not tested against this specific condition, such errors might go unnoticed.

Loading history...
208 19
            $this->onConnectionTune($frame);
209 24
        } elseif ($frame instanceof ConnectionClose) {
1 ignored issue
show
Bug introduced by
The class ButterAMQP\AMQP091\Framing\Method\ConnectionClose does not exist. Did you forget a USE statement, or did you not list all dependencies?

This error could be the result of:

1. Missing dependencies

PHP Analyzer uses your composer.json file (if available) to determine the dependencies of your project and to determine all the available classes and functions. It expects the composer.json to be in the root folder of your repository.

Are you sure this class is defined by one of your dependencies, or did you maybe not list a dependency in either the require or require-dev section?

2. Missing use statement

PHP does not complain about undefined classes in ìnstanceof checks. For example, the following PHP code will work perfectly fine:

if ($x instanceof DoesNotExist) {
    // Do something.
}

If you have not tested against this specific condition, such errors might go unnoticed.

Loading history...
210 3
            $this->onConnectionClose($frame);
211 20
        } elseif ($frame instanceof ConnectionBlocked) {
1 ignored issue
show
Bug introduced by
The class ButterAMQP\AMQP091\Frami...ethod\ConnectionBlocked does not exist. Did you forget a USE statement, or did you not list all dependencies?

This error could be the result of:

1. Missing dependencies

PHP Analyzer uses your composer.json file (if available) to determine the dependencies of your project and to determine all the available classes and functions. It expects the composer.json to be in the root folder of your repository.

Are you sure this class is defined by one of your dependencies, or did you maybe not list a dependency in either the require or require-dev section?

2. Missing use statement

PHP does not complain about undefined classes in ìnstanceof checks. For example, the following PHP code will work perfectly fine:

if ($x instanceof DoesNotExist) {
    // Do something.
}

If you have not tested against this specific condition, such errors might go unnoticed.

Loading history...
212 1
            $this->onConnectionBlocked($frame);
213 19
        } elseif ($frame instanceof ConnectionUnblocked) {
1 ignored issue
show
Bug introduced by
The class ButterAMQP\AMQP091\Frami...hod\ConnectionUnblocked does not exist. Did you forget a USE statement, or did you not list all dependencies?

This error could be the result of:

1. Missing dependencies

PHP Analyzer uses your composer.json file (if available) to determine the dependencies of your project and to determine all the available classes and functions. It expects the composer.json to be in the root folder of your repository.

Are you sure this class is defined by one of your dependencies, or did you maybe not list a dependency in either the require or require-dev section?

2. Missing use statement

PHP does not complain about undefined classes in ìnstanceof checks. For example, the following PHP code will work perfectly fine:

if ($x instanceof DoesNotExist) {
    // Do something.
}

If you have not tested against this specific condition, such errors might go unnoticed.

Loading history...
214 1
            $this->onConnectionUnblocked($frame);
215 1
        }
216 25
    }
217
218
    /**
219
     * This frame is the first frame received from server.
220
     * It provides server details and requests client credentials.
221
     *
222
     * @param ConnectionStart $frame
223
     */
224 20
    private function onConnectionStart(ConnectionStart $frame)
225
    {
226 20
        $properties = $frame->getServerProperties();
227
228 20
        $this->capabilities = isset($properties['capabilities']) ?
229 20
            $properties['capabilities'] : [];
230
231 20
        $mechanism = $this->authenticator
232 20
            ->get(explode(' ', $frame->getMechanisms()));
233
234 20
        list($locale) = explode(' ', $frame->getLocales());
235
236 20
        $this->send(new ConnectionStartOk(
237 20
            0,
238
            [
239 20
                'platform' => 'PHP '.PHP_VERSION,
240 20
                'product' => 'ButterAMQP',
241 20
                'version' => '0.1.0',
242
                'capabilities' => [
243 20
                    'publisher_confirms' => true,
244 20
                    'exchange_exchange_bindings' => true,
245 20
                    'basic.nack' => true,
246 20
                    'connection.blocked' => true,
247 20
                    'consumer_cancel_notify' => true,
248 20
                    'authentication_failure_close' => true,
249 20
                ],
250 20
            ],
251 20
            $mechanism->getName(),
252 20
            $mechanism->getResponse($this->url->getUser(), $this->url->getPass()),
253
            $locale
254 20
        ));
255 20
    }
256
257
    /**
258
     * This frame is received to setup connection preferences, like max frame size,
259
     * max number of channel and heartbeat delay.
260
     *
261
     * Values in the request can be lowered by client.
262
     *
263
     * @param ConnectionTune $frame
264
     */
265
    private function onConnectionTune(ConnectionTune $frame)
266
    {
267 19
        $negotiate = function ($a, $b) {
268 19
            return ($a * $b == 0) ? max($a, $b) : min($a, $b);
269 19
        };
270
271 19
        $channelMax = $negotiate($this->url->getQueryParameter('channel_max', 0), $frame->getChannelMax());
272 19
        $frameMax = $negotiate($this->url->getQueryParameter('frame_max', 0), $frame->getFrameMax());
273 19
        $heartbeat = $negotiate($this->url->getQueryParameter('heartbeat', 60), $frame->getHeartbeat());
274
275 19
        $this->send(new ConnectionTuneOk(0, $channelMax, $frameMax, $heartbeat));
276
277 19
        $this->wire->setHeartbeat(new TimeHeartbeat($heartbeat))
278 19
            ->setFrameMax($frameMax);
279 19
    }
280
281
    /**
282
     * This frame is received once server decide to close connection, normally because an unrecoverable error occur.
283
     *
284
     * @param ConnectionClose $frame
285
     *
286
     * @throws AMQPException
287
     */
288 3
    private function onConnectionClose(ConnectionClose $frame)
289
    {
290 3
        $this->send(new ConnectionCloseOk(0));
291 3
        $this->wire->close();
292
293 3
        $this->status = self::STATUS_CLOSED;
0 ignored issues
show
Documentation Bug introduced by
The property $status was declared of type string, but self::STATUS_CLOSED is of type integer. Maybe add a type cast?

This check looks for assignments to scalar types that may be of the wrong type.

To ensure the code behaves as expected, it may be a good idea to add an explicit type cast.

$answer = 42;

$correct = false;

$correct = (bool) $answer;
Loading history...
294
295 3
        if ($frame->getReplyCode()) {
296 2
            throw AMQPException::make($frame->getReplyText(), $frame->getReplyCode());
297
        }
298 1
    }
299
300
    /**
301
     * This frame is received once server decide to suspend connection, for example because server
302
     * run out of memory and can not provide service for the connection. When this happen consumer
303
     * suppose to suspend all activities until connection.unblocked is received.
304
     *
305
     * @param ConnectionBlocked $frame
306
     */
307 1
    private function onConnectionBlocked(ConnectionBlocked $frame)
1 ignored issue
show
Unused Code introduced by
The parameter $frame is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
308
    {
309 1
        $this->status = self::STATUS_BLOCKED;
0 ignored issues
show
Documentation Bug introduced by
The property $status was declared of type string, but self::STATUS_BLOCKED is of type integer. Maybe add a type cast?

This check looks for assignments to scalar types that may be of the wrong type.

To ensure the code behaves as expected, it may be a good idea to add an explicit type cast.

$answer = 42;

$correct = false;

$correct = (bool) $answer;
Loading history...
310 1
    }
311
312
    /**
313
     * This frame is received once connection returns back to normal state after being suspended.
314
     * See onConnectionBlocked above.
315
     *
316
     * @param ConnectionUnblocked $frame
317
     */
318 1
    private function onConnectionUnblocked(ConnectionUnblocked $frame)
1 ignored issue
show
Unused Code introduced by
The parameter $frame is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
319
    {
320 1
        $this->status = self::STATUS_READY;
0 ignored issues
show
Documentation Bug introduced by
The property $status was declared of type string, but self::STATUS_READY is of type integer. Maybe add a type cast?

This check looks for assignments to scalar types that may be of the wrong type.

To ensure the code behaves as expected, it may be a good idea to add an explicit type cast.

$answer = 42;

$correct = false;

$correct = (bool) $answer;
Loading history...
321 1
    }
322
}
323