Wire   A
last analyzed

Complexity

Total Complexity 34

Size/Duplication

Total Lines 272
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 9

Test Coverage

Coverage 100%

Importance

Changes 0
Metric Value
wmc 34
lcom 1
cbo 9
dl 0
loc 272
ccs 94
cts 94
cp 1
rs 9.2
c 0
b 0
f 0

16 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 6 1
A open() 0 18 1
A next() 0 15 3
A read() 0 4 2
B tryNext() 0 31 5
A dispatch() 0 6 2
A send() 0 10 2
A chop() 0 17 4
A wait() 0 12 3
A isFrameMatch() 0 4 3
A subscribe() 0 6 1
A close() 0 6 1
A setHeartbeat() 0 6 1
A setFrameMax() 0 6 1
A getSubscriber() 0 4 2
A getProtocolForScheme() 0 4 2
1
<?php
2
3
namespace ButterAMQP\AMQP091;
4
5
use ButterAMQP\Buffer;
6
use ButterAMQP\Exception\InvalidFrameEndingException;
7
use ButterAMQP\AMQP091\Framing\Content;
8
use ButterAMQP\AMQP091\Framing\Frame;
9
use ButterAMQP\AMQP091\Framing\Heartbeat;
10
use ButterAMQP\Heartbeat\NullHeartbeat;
11
use ButterAMQP\HeartbeatInterface;
12
use ButterAMQP\IOInterface;
13
use ButterAMQP\Url;
14
use Psr\Log\LoggerAwareInterface;
15
use Psr\Log\LoggerAwareTrait;
16
use Psr\Log\NullLogger;
17
18
class Wire implements WireInterface, LoggerAwareInterface
19
{
20
    const PROTOCOL_HEADER = "AMQP\x00\x00\x09\x01";
21
    const FRAME_ENDING = "\xCE";
22
23
    use LoggerAwareTrait;
24
25
    /**
26
     * @var IOInterface
27
     */
28
    private $io;
29
30
    /**
31
     * @var WireSubscriberInterface[]
32
     */
33
    private $subscribers = [];
34
35
    /**
36
     * @var HeartbeatInterface
37
     */
38
    private $heartbeat;
39
40
    /**
41
     * @var int
42
     */
43
    private $frameMax;
44
45
    /**
46
     * @var string
47
     */
48
    private $buffer;
49
50
    /**
51
     * @param IOInterface $io
52
     */
53 33
    public function __construct(IOInterface $io)
54
    {
55 33
        $this->io = $io;
56 33
        $this->logger = new NullLogger();
57 33
        $this->heartbeat = new NullHeartbeat();
58 33
    }
59
60
    /**
61
     * {@inheritdoc}
62
     */
63 19
    public function open(Url $url)
64
    {
65 19
        $this->subscribers = [];
66 19
        $this->buffer = '';
67
68 19
        $this->io->open(
69 19
            $this->getProtocolForScheme($url),
70 19
            $url->getHost(),
71 19
            $url->getPort(),
72 19
            $url->getQuery()
73 19
        );
74
75 19
        $this->io->write(self::PROTOCOL_HEADER);
76
77
        // @todo: peek next 8 bytes and check if its a frame or "wrong protocol" reply
78
79 19
        return $this;
80
    }
81
82
    /**
83
     * {@inheritdoc}
84
     */
85 28
    public function next($blocking = true)
86
    {
87 28
        if ($this->heartbeat->shouldSendHeartbeat()) {
88 1
            $this->send(new Heartbeat(0));
89 1
        }
90
91 28
        if ($frame = $this->tryNext()) {
92 23
            $this->heartbeat->serverBeat();
93 23
            $this->dispatch($frame);
94 23
        } else {
95 28
            $this->read($blocking);
96
        }
97
98 28
        return $frame;
99
    }
100
101
    /**
102
     * Read next chunk of the data from the buffer.
103
     *
104
     * @param bool $blocking
105
     */
106 28
    private function read($blocking)
107
    {
108 28
        $this->buffer .= $this->io->read($this->frameMax ?: 8192, $blocking);
109 28
    }
110
111
    /**
112
     * Tries to fetch next frame from reading buffer.
113
     * Will return a Frame or null if there is not enough data in the buffer.
114
     *
115
     * @return Frame|null
116
     *
117
     * @throws InvalidFrameEndingException
118
     */
119 28
    private function tryNext()
120
    {
121 28
        $length = strlen($this->buffer);
122
123 28
        if ($length < 7) {
124 28
            return null;
125
        }
126
127 24
        $header = unpack('Ctype/nchannel/Nsize', $this->buffer);
128 24
        $size = $header['size'] + 8;
129
130 24
        if ($length < $size) {
131 1
            return null;
132
        }
133
134 24
        if ($length == $size) {
135 23
            $buffer = $this->buffer;
136 23
            $this->buffer = '';
137 23
        } else {
138 14
            $buffer = substr($this->buffer, 0, $size);
139 14
            $this->buffer = substr($this->buffer, $size, $length - $size);
140
        }
141
142 24
        $end = $buffer[$size - 1];
143
144 24
        if ($end != self::FRAME_ENDING) {
145 1
            throw new InvalidFrameEndingException(sprintf('Invalid frame ending (%d)', unpack('c', $end)[1]));
146
        }
147
148 23
        return Frame::decode($header, new Buffer($buffer, 7));
149
    }
150
151
    /**
152
     * @param Frame $frame
153
     */
154 23
    private function dispatch(Frame $frame)
155
    {
156 23
        if ($subscriber = $this->getSubscriber($frame->getChannel())) {
157 19
            $subscriber->dispatch($frame);
158 19
        }
159 23
    }
160
161
    /**
162
     * {@inheritdoc}
163
     */
164 22
    public function send(Frame $frame)
165
    {
166 22
        $this->heartbeat->clientBeat();
167
168 22
        foreach ($this->chop($frame) as $piece) {
169 22
            $this->io->write($piece->encode());
170 22
        }
171
172 22
        return $this;
173
    }
174
175
    /**
176
     * @param Frame $frame
177
     *
178
     * @return array
179
     */
180 22
    private function chop(Frame $frame)
181
    {
182 22
        if (!$this->frameMax || !$frame instanceof Content) {
183 21
            return [$frame];
184
        }
185
186 15
        $frames = [];
187 15
        $data = $frame->getData();
188 15
        $size = $this->frameMax - 8;
189 15
        $chunks = ceil(strlen($data) / $size);
190
191 15
        for ($c = 0; $c < $chunks; ++$c) {
192 8
            $frames[] = new Content($frame->getChannel(), substr($data, $c * $size, $size));
193 8
        }
194
195 15
        return $frames;
196
    }
197
198
    /**
199
     * {@inheritdoc}
200
     */
201 20
    public function wait($channel, $types)
202
    {
203 20
        if (!is_array($types)) {
204 19
            $types = [$types];
205 19
        }
206
207
        do {
208 20
            $frame = $this->next(true);
209 20
        } while (!$this->isFrameMatch($frame, $channel, $types));
210
211 19
        return $frame;
212
    }
213
214
    /**
215
     * @param Frame $frame
216
     * @param int   $channel
217
     * @param array $types
218
     *
219
     * @return bool
220
     */
221 20
    private function isFrameMatch(Frame $frame = null, $channel = 0, array $types = [])
222
    {
223 20
        return $frame && $frame->getChannel() == $channel && in_array(get_class($frame), $types);
224
    }
225
226
    /**
227
     * {@inheritdoc}
228
     */
229 19
    public function subscribe($channel, WireSubscriberInterface $handler)
230
    {
231 19
        $this->subscribers[$channel] = $handler;
232
233 19
        return $this;
234
    }
235
236
    /**
237
     * {@inheritdoc}
238
     */
239 19
    public function close()
240
    {
241 19
        $this->io->close();
242
243 19
        return $this;
244
    }
245
246
    /**
247
     * @param HeartbeatInterface $heartbeat
248
     *
249
     * @return $this
250
     */
251 20
    public function setHeartbeat(HeartbeatInterface $heartbeat)
252
    {
253 20
        $this->heartbeat = $heartbeat;
254
255 20
        return $this;
256
    }
257
258
    /**
259
     * @param int $frameMax
260
     *
261
     * @return $this
262
     */
263 18
    public function setFrameMax($frameMax)
264
    {
265 18
        $this->frameMax = $frameMax;
266
267 18
        return $this;
268
    }
269
270
    /**
271
     * @param int $channel
272
     *
273
     * @return WireSubscriberInterface|null
274
     */
275 23
    private function getSubscriber($channel)
276
    {
277 23
        return isset($this->subscribers[$channel]) ? $this->subscribers[$channel] : null;
278
    }
279
280
    /**
281
     * @param Url $url
282
     *
283
     * @return string
284
     */
285 19
    private function getProtocolForScheme(Url $url)
286
    {
287 19
        return strcasecmp($url->getScheme(), 'amqps') == 0 ? 'ssl' : 'tcp';
288
    }
289
}
290