Completed
Push — master ( e17409...9d6c1e )
by Sergey
04:06
created

Wire::getProtocolForScheme()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 4
ccs 2
cts 2
cp 1
rs 10
c 0
b 0
f 0
cc 2
eloc 2
nc 2
nop 1
crap 2
1
<?php
2
3
namespace ButterAMQP\AMQP091;
4
5
use ButterAMQP\Binary;
6
use ButterAMQP\Buffer;
7
use ButterAMQP\Exception\InvalidFrameEndingException;
8
use ButterAMQP\AMQP091\Framing\Content;
9
use ButterAMQP\AMQP091\Framing\Frame;
10
use ButterAMQP\AMQP091\Framing\Heartbeat;
11
use ButterAMQP\Heartbeat\NullHeartbeat;
12
use ButterAMQP\HeartbeatInterface;
13
use ButterAMQP\IOInterface;
14
use ButterAMQP\Url;
15
use ButterAMQP\WireInterface;
16
use ButterAMQP\WireSubscriberInterface;
17
use Psr\Log\LoggerAwareInterface;
18
use Psr\Log\LoggerAwareTrait;
19
use Psr\Log\NullLogger;
20
21
class Wire implements WireInterface, LoggerAwareInterface
22
{
23
    const PROTOCOL_HEADER = "AMQP\x00\x00\x09\x01";
24
    const FRAME_ENDING = "\xCE";
25
26
    use LoggerAwareTrait;
27
28
    /**
29
     * @var IOInterface
30
     */
31
    private $io;
32
33
    /**
34
     * @var WireSubscriberInterface[]
35
     */
36
    private $subscribers = [];
37
38
    /**
39
     * @var HeartbeatInterface
40
     */
41
    private $heartbeat;
42
43
    /**
44
     * @var int
45
     */
46
    private $frameMax;
47
48
    /**
49
     * @param IOInterface $io
50
     */
51 33
    public function __construct(IOInterface $io)
52
    {
53 33
        $this->io = $io;
54 33
        $this->logger = new NullLogger();
55 33
        $this->heartbeat = new NullHeartbeat();
56 33
    }
57
58
    /**
59
     * {@inheritdoc}
60
     */
61 19
    public function open(Url $url)
62
    {
63 19
        $this->subscribers = [];
64
65 19
        $this->io->open(
66 19
            $this->getProtocolForScheme($url),
67 19
            $url->getHost(),
68 19
            $url->getPort(),
69 19
            $url->getQuery()
70 19
        );
71
72 19
        $this->io->write(self::PROTOCOL_HEADER);
73
74
        // @todo: peek next 8 bytes and check if its a frame or "wrong protocol" reply
75
76 19
        return $this;
77
    }
78
79
    /**
80
     * {@inheritdoc}
81
     */
82 28
    public function next($blocking = true)
83
    {
84 28
        if ($this->heartbeat->shouldSendHeartbeat()) {
85 2
            $this->send(new Heartbeat(0));
86 1
        }
87
88 28
        if (($peek = $this->io->peek(7, $blocking)) === null) {
89 2
            return null;
90
        }
91
92 26
        $header = unpack('Ctype/nchannel/Nsize', $peek);
93
94 26
        if (($data = $this->io->read($header['size'] + 8, $blocking)) === null) {
95 3
            return null;
96
        }
97
98 24
        $end = $data[strlen($data) - 1];
99
100 24
        if ($end != self::FRAME_ENDING) {
101 1
            throw new InvalidFrameEndingException(sprintf('Invalid frame ending (%d)', Binary::unpack('c', $end)));
102
        }
103
104 23
        $frame = Frame::decode(new Buffer($data));
105
106 23
        if ($subscriber = $this->getSubscriber($frame->getChannel())) {
107 19
            $subscriber->dispatch($frame);
108 19
        }
109
110 23
        $this->heartbeat->serverBeat();
111
112 23
        return $frame;
113 1
    }
114
115
    /**
116
     * {@inheritdoc}
117
     */
118 22
    public function send(Frame $frame)
119
    {
120 22
        $this->heartbeat->clientBeat();
121
122 22
        foreach ($this->chop($frame) as $piece) {
123 22
            $this->io->write($piece->encode());
124 22
        }
125
126 22
        return $this;
127
    }
128
129
    /**
130
     * @param Frame $frame
131
     *
132
     * @return array
133
     */
134 22
    private function chop(Frame $frame)
135
    {
136 22
        if (!$this->frameMax || !$frame instanceof Content) {
1 ignored issue
show
Bug introduced by
The class ButterAMQP\AMQP091\Framing\Content does not exist. Did you forget a USE statement, or did you not list all dependencies?

This error could be the result of:

1. Missing dependencies

PHP Analyzer uses your composer.json file (if available) to determine the dependencies of your project and to determine all the available classes and functions. It expects the composer.json to be in the root folder of your repository.

Are you sure this class is defined by one of your dependencies, or did you maybe not list a dependency in either the require or require-dev section?

2. Missing use statement

PHP does not complain about undefined classes in ìnstanceof checks. For example, the following PHP code will work perfectly fine:

if ($x instanceof DoesNotExist) {
    // Do something.
}

If you have not tested against this specific condition, such errors might go unnoticed.

Loading history...
137 21
            return [$frame];
138
        }
139
140 15
        $frames = [];
141 15
        $data = $frame->getData();
142 15
        $size = $this->frameMax - 8;
143 15
        $chunks = ceil(strlen($data) / $size);
144
145 15
        for ($c = 0; $c < $chunks; ++$c) {
146 8
            $frames[] = new Content($frame->getChannel(), substr($data, $c * $size, $size));
147 8
        }
148
149 15
        return $frames;
150
    }
151
152
    /**
153
     * {@inheritdoc}
154
     */
155 20
    public function wait($channel, $types)
156
    {
157 20
        if (!is_array($types)) {
158 19
            $types = [$types];
159 19
        }
160
161
        do {
162 20
            $frame = $this->next(true);
163
164 20
            if (!$frame || $frame->getChannel() != $channel) {
165 3
                continue;
166
            }
167
168 20
            foreach ($types as $type) {
169 20
                if (is_a($frame, $type)) {
170 19
                    return $frame;
171
                }
172 18
            }
173 20
        } while (true);
174
175
        return $frame;
176
    }
177
178
    /**
179
     * {@inheritdoc}
180
     */
181 19
    public function subscribe($channel, WireSubscriberInterface $handler)
182
    {
183 19
        $this->subscribers[$channel] = $handler;
184
185 19
        return $this;
186
    }
187
188
    /**
189
     * {@inheritdoc}
190
     */
191 8
    public function close()
192
    {
193 8
        $this->io->close();
194
195 8
        return $this;
196
    }
197
198
    /**
199
     * @param HeartbeatInterface $heartbeat
200
     *
201
     * @return $this
202
     */
203 20
    public function setHeartbeat(HeartbeatInterface $heartbeat)
204
    {
205 20
        $this->heartbeat = $heartbeat;
206
207 20
        return $this;
208
    }
209
210
    /**
211
     * @param int $frameMax
212
     *
213
     * @return $this
214
     */
215 18
    public function setFrameMax($frameMax)
216 1
    {
217 18
        $this->frameMax = $frameMax;
218
219 18
        return $this;
220
    }
221
222
    /**
223
     * @param int $channel
224
     *
225
     * @return WireSubscriberInterface|null
226
     */
227 23
    private function getSubscriber($channel)
228
    {
229 23
        return isset($this->subscribers[$channel]) ? $this->subscribers[$channel] : null;
230
    }
231
232
    /**
233
     * @param Url $url
234
     *
235
     * @return string
236
     */
237 19
    private function getProtocolForScheme(Url $url)
238
    {
239 19
        return strcasecmp($url->getScheme(), 'amqps') == 0 ? 'ssl' : 'tcp';
240
    }
241
}
242