Completed
Push — master ( 73f6e1...42bf17 )
by Sergey
04:22
created

Connection::serve()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 6
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 1
Bugs 0 Features 1
Metric Value
c 1
b 0
f 1
dl 0
loc 6
ccs 3
cts 3
cp 1
rs 9.4285
cc 1
eloc 3
nc 1
nop 1
crap 1
1
<?php
2
3
namespace ButterAMQP;
4
5
use ButterAMQP\Exception\AMQPException;
6
use ButterAMQP\Exception\InvalidChannelNumberException;
7
use ButterAMQP\Framing\Frame;
8
use ButterAMQP\Framing\Heartbeat;
9
use ButterAMQP\Framing\Method\ConnectionBlocked;
10
use ButterAMQP\Framing\Method\ConnectionClose;
11
use ButterAMQP\Framing\Method\ConnectionCloseOk;
12
use ButterAMQP\Framing\Method\ConnectionOpen;
13
use ButterAMQP\Framing\Method\ConnectionOpenOk;
14
use ButterAMQP\Framing\Method\ConnectionStart;
15
use ButterAMQP\Framing\Method\ConnectionStartOk;
16
use ButterAMQP\Framing\Method\ConnectionTune;
17
use ButterAMQP\Framing\Method\ConnectionTuneOk;
18
use ButterAMQP\Framing\Method\ConnectionUnblocked;
19
use ButterAMQP\Heartbeat\TimeHeartbeat;
20
use ButterAMQP\Security\Authenticator;
21
use ButterAMQP\Security\AuthenticatorInterface;
22
use Psr\Log\LoggerAwareInterface;
23
use Psr\Log\LoggerAwareTrait;
24
use Psr\Log\NullLogger;
25
26
class Connection implements ConnectionInterface, WireSubscriberInterface, LoggerAwareInterface
27
{
28
    use LoggerAwareTrait;
29
30
    const STATUS_CLOSED = 0;
31
    const STATUS_READY = 1;
32
    const STATUS_BLOCKED = 2;
33
34
    /**
35
     * @var Url
36
     */
37
    private $url;
38
39
    /**
40
     * @var WireInterface
41
     */
42
    private $wire;
43
44
    /**
45
     * @var AuthenticatorInterface
46
     */
47
    private $authenticator;
48
49
    /**
50
     * @var string
51
     */
52
    private $status;
53
54
    /**
55
     * @var Channel[]
56
     */
57
    private $channels = [];
58
59
    /**
60
     * @var array
61
     */
62
    private $capabilities = [];
63
64
    /**
65
     * @param Url|string             $url
66
     * @param WireInterface          $wire
67
     * @param AuthenticatorInterface $authenticator
68
     */
69 31
    public function __construct(Url $url, WireInterface $wire, AuthenticatorInterface $authenticator = null)
70
    {
71 31
        $this->url = $url;
72 31
        $this->wire = $wire;
73 31
        $this->authenticator = $authenticator ?: Authenticator::build();
74 31
        $this->logger = new NullLogger();
75 31
    }
76
77
    /**
78
     * Connection status. See STATUS_* constants for possible values.
79
     *
80
     * @return string
81
     */
82 4
    public function getStatus()
83
    {
84 4
        return $this->status;
85
    }
86
87
    /**
88
     * {@inheritdoc}
89
     */
90 19
    public function open()
91
    {
92 19
        $this->channels = [];
93 19
        $this->capabilities = [];
94
95 19
        $this->wire->open($this->url)
96 19
            ->subscribe(0, $this);
97
98 19
        $this->wait(ConnectionTune::class);
99
100
        //$this->logger->debug(sprintf('Opening virtual host "%s"', $this->url->getVhost()));
0 ignored issues
show
Unused Code Comprehensibility introduced by
72% of this comment could be valid code. Did you maybe forget this after debugging?

Sometimes obsolete code just ends up commented out instead of removed. In this case it is better to remove the code once you have checked you do not need it.

The code might also have been commented out for debugging purposes. In this case it is vital that someone uncomments it again or your project may behave in very unexpected ways in production.

This check looks for comments that seem to be mostly valid code and reports them.

Loading history...
101
102 18
        $this->send(new ConnectionOpen($this->url->getVhost(), '', false))
103 18
            ->wait(ConnectionOpenOk::class);
104
105 18
        $this->status = self::STATUS_READY;
0 ignored issues
show
Documentation Bug introduced by
The property $status was declared of type string, but self::STATUS_READY is of type integer. Maybe add a type cast?

This check looks for assignments to scalar types that may be of the wrong type.

To ensure the code behaves as expected, it may be a good idea to add an explicit type cast.

$answer = 42;

$correct = false;

$correct = (bool) $answer;
Loading history...
106
107 18
        return $this;
108
    }
109
110
    /**
111
     * {@inheritdoc}
112
     */
113 19
    public function channel($id = null)
114
    {
115 19
        if ($id === null) {
116 18
            $id = count($this->channels) == 0 ? 1 : max(array_keys($this->channels)) + 1;
117 18
        }
118
119 19
        if (!is_integer($id) || $id <= 0) {
120 1
            throw new InvalidChannelNumberException('Channel ID should be positive integer');
121
        }
122
123 18
        if (!isset($this->channels[$id])) {
124 18
            $channel = new Channel($this->wire, $id);
125
126 18
            if ($channel instanceof LoggerAwareInterface) {
127 18
                $channel->setLogger($this->logger);
128 18
            }
129
130 18
            $this->channels[$id] = $channel;
131
132 18
            $channel->open();
133 18
        }
134
135 18
        return $this->channels[$id];
136
    }
137
138
    /**
139
     * {@inheritdoc}
140
     */
141 7
    public function close($code = 0, $reason = '')
142
    {
143 7
        $this->send(new ConnectionClose($code, $reason, 0, 0))
144 7
            ->wait(ConnectionCloseOk::class);
145
146 7
        $this->status = self::STATUS_CLOSED;
0 ignored issues
show
Documentation Bug introduced by
The property $status was declared of type string, but self::STATUS_CLOSED is of type integer. Maybe add a type cast?

This check looks for assignments to scalar types that may be of the wrong type.

To ensure the code behaves as expected, it may be a good idea to add an explicit type cast.

$answer = 42;

$correct = false;

$correct = (bool) $answer;
Loading history...
147
148 7
        $this->wire->close();
149
150 7
        return $this;
151
    }
152
153
    /**
154
     * {@inheritdoc}
155
     */
156 1
    public function isSupported($capability)
157
    {
158 1
        return isset($this->capabilities[$capability]) ?
159 1
            (bool) $this->capabilities[$capability] : false;
160
    }
161
162
    /**
163
     * {@inheritdoc}
164
     */
165 10
    public function serve($blocking = true)
166
    {
167 10
        $this->wire->next($blocking);
168
169 8
        return $this;
170
    }
171
172
    /**
173
     * Sends frame to the service channel (#0).
174
     *
175
     * @param Frame $frame
176
     *
177
     * @return $this
178
     */
179 26
    private function send(Frame $frame)
180
    {
181 26
        $this->wire->send(0, $frame);
182
183 26
        return $this;
184
    }
185
186
    /**
187
     * Wait for a frame in the service channel (#0).
188
     *
189
     * @param string|array $type
190
     *
191
     * @return Frame
192
     */
193 20
    private function wait($type)
194
    {
195 20
        return $this->wire->wait(0, $type);
196
    }
197
198
    /**
199
     * {@inheritdoc}
200
     */
201 26
    public function dispatch(Frame $frame)
202
    {
203 26
        if ($frame instanceof ConnectionStart) {
204 20
            $this->onConnectionStart($frame);
205 20
        }
206
207 26
        if ($frame instanceof ConnectionTune) {
208 19
            $this->onConnectionTune($frame);
209 19
        }
210
211 26
        if ($frame instanceof ConnectionClose) {
212 3
            $this->onConnectionClose($frame);
213 1
        }
214
215 25
        if ($frame instanceof ConnectionBlocked) {
216 1
            $this->onConnectionBlocked($frame);
217 1
        }
218
219 25
        if ($frame instanceof ConnectionUnblocked) {
220 1
            $this->onConnectionUnblocked($frame);
221 1
        }
222 25
    }
223
224
    /**
225
     * This frame is the first frame received from server.
226
     * It provides server details and requests client credentials.
227
     *
228
     * @param ConnectionStart $frame
229
     */
230 20
    private function onConnectionStart(ConnectionStart $frame)
231
    {
232 20
        $properties = $frame->getServerProperties();
233
234 20
        $this->capabilities = isset($properties['capabilities']) ?
235 20
            $properties['capabilities'] : [];
236
237 20
        $mechanism = $this->authenticator
238 20
            ->get(explode(' ', $frame->getMechanisms()));
239
240 20
        list($locale) = explode(' ', $frame->getLocales());
241
242 20
        $this->send(new ConnectionStartOk(
243
            [
244 20
                'platform' => 'PHP '.PHP_VERSION,
245 20
                'product' => 'ButterAMQP',
246 20
                'version' => '0.1.0',
247
                'capabilities' => [
248 20
                    'publisher_confirms' => true,
249 20
                    'exchange_exchange_bindings' => true,
250 20
                    'basic.nack' => true,
251 20
                    'connection.blocked' => true,
252 20
                    'consumer_cancel_notify' => true,
253 20
                    'authentication_failure_close' => true,
254 20
                ],
255 20
            ],
256 20
            $mechanism->getName(),
257 20
            $mechanism->getResponse($this->url->getUser(), $this->url->getPass()),
258
            $locale
259 20
        ));
260 20
    }
261
262
    /**
263
     * This frame is received to setup connection preferences, like max frame size,
264
     * max number of channel and heartbeat delay.
265
     *
266
     * Values in the request can be lowered by client.
267
     *
268
     * @param ConnectionTune $frame
269
     */
270
    private function onConnectionTune(ConnectionTune $frame)
271
    {
272 19
        $negotiate = function ($a, $b) {
273 19
            return ($a * $b == 0) ? max($a, $b) : min($a, $b);
274 19
        };
275
276 19
        $channelMax = $negotiate($this->url->getQueryParameter('channel_max', 0), $frame->getChannelMax());
277 19
        $frameMax = $negotiate($this->url->getQueryParameter('frame_max', 0), $frame->getFrameMax());
278 19
        $heartbeat = $negotiate($this->url->getQueryParameter('heartbeat', 60), $frame->getHeartbeat());
279
280
        //$this->logger->debug(sprintf(
0 ignored issues
show
Unused Code Comprehensibility introduced by
63% of this comment could be valid code. Did you maybe forget this after debugging?

Sometimes obsolete code just ends up commented out instead of removed. In this case it is better to remove the code once you have checked you do not need it.

The code might also have been commented out for debugging purposes. In this case it is vital that someone uncomments it again or your project may behave in very unexpected ways in production.

This check looks for comments that seem to be mostly valid code and reports them.

Loading history...
281
        //    'Tune connection: up to %d channels, %d frame size, heartbeat every %d seconds',
282
        //    $channelMax,
283
        //    $frameMax,
284
        //    $heartbeat
285
        //));
286
287 19
        $this->send(new ConnectionTuneOk($channelMax, $frameMax, $heartbeat));
288
289 19
        $this->wire->setHeartbeat(new TimeHeartbeat($heartbeat))
290 19
            ->setFrameMax($frameMax);
291 19
    }
292
293
    /**
294
     * This frame is received once server decide to close connection, normally because an unrecoverable error occur.
295
     *
296
     * @param ConnectionClose $frame
297
     *
298
     * @throws AMQPException
299
     */
300 3
    private function onConnectionClose(ConnectionClose $frame)
301
    {
302 3
        $this->send(new ConnectionCloseOk());
303 3
        $this->wire->close();
304
305 3
        $this->status = self::STATUS_CLOSED;
0 ignored issues
show
Documentation Bug introduced by
The property $status was declared of type string, but self::STATUS_CLOSED is of type integer. Maybe add a type cast?

This check looks for assignments to scalar types that may be of the wrong type.

To ensure the code behaves as expected, it may be a good idea to add an explicit type cast.

$answer = 42;

$correct = false;

$correct = (bool) $answer;
Loading history...
306
307 3
        if ($frame->getReplyCode()) {
308 2
            throw AMQPException::make($frame->getReplyText(), $frame->getReplyCode());
309
        }
310 1
    }
311
312
    /**
313
     * This frame is received once server decide to suspend connection, for example because server
314
     * run out of memory and can not provide service for the connection. When this happen consumer
315
     * suppose to suspend all activities until connection.unblocked is received.
316
     *
317
     * @param ConnectionBlocked $frame
318
     */
319 1
    private function onConnectionBlocked(ConnectionBlocked $frame)
1 ignored issue
show
Unused Code introduced by
The parameter $frame is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
320
    {
321 1
        $this->status = self::STATUS_BLOCKED;
0 ignored issues
show
Documentation Bug introduced by
The property $status was declared of type string, but self::STATUS_BLOCKED is of type integer. Maybe add a type cast?

This check looks for assignments to scalar types that may be of the wrong type.

To ensure the code behaves as expected, it may be a good idea to add an explicit type cast.

$answer = 42;

$correct = false;

$correct = (bool) $answer;
Loading history...
322 1
    }
323
324
    /**
325
     * This frame is received once connection returns back to normal state after being suspended.
326
     * See onConnectionBlocked above.
327
     *
328
     * @param ConnectionUnblocked $frame
329
     */
330 1
    private function onConnectionUnblocked(ConnectionUnblocked $frame)
1 ignored issue
show
Unused Code introduced by
The parameter $frame is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
331
    {
332 1
        $this->status = self::STATUS_READY;
0 ignored issues
show
Documentation Bug introduced by
The property $status was declared of type string, but self::STATUS_READY is of type integer. Maybe add a type cast?

This check looks for assignments to scalar types that may be of the wrong type.

To ensure the code behaves as expected, it may be a good idea to add an explicit type cast.

$answer = 42;

$correct = false;

$correct = (bool) $answer;
Loading history...
333 1
    }
334
}
335