Completed
Push — master ( b2e4f3...d16f6a )
by Sergey
08:59
created

Wire::read()   A

Complexity

Conditions 2
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 4
ccs 3
cts 3
cp 1
rs 10
c 0
b 0
f 0
cc 2
eloc 2
nc 1
nop 1
crap 2
1
<?php
2
3
namespace ButterAMQP\AMQP091;
4
5
use ButterAMQP\Binary;
6
use ButterAMQP\Buffer;
7
use ButterAMQP\Exception\InvalidFrameEndingException;
8
use ButterAMQP\AMQP091\Framing\Content;
9
use ButterAMQP\AMQP091\Framing\Frame;
10
use ButterAMQP\AMQP091\Framing\Heartbeat;
11
use ButterAMQP\Heartbeat\NullHeartbeat;
12
use ButterAMQP\HeartbeatInterface;
13
use ButterAMQP\IOInterface;
14
use ButterAMQP\Url;
15
use Psr\Log\LoggerAwareInterface;
16
use Psr\Log\LoggerAwareTrait;
17
use Psr\Log\NullLogger;
18
19
class Wire implements WireInterface, LoggerAwareInterface
20
{
21
    const PROTOCOL_HEADER = "AMQP\x00\x00\x09\x01";
22
    const FRAME_ENDING = "\xCE";
23
24
    use LoggerAwareTrait;
25
26
    /**
27
     * @var IOInterface
28
     */
29
    private $io;
30
31
    /**
32
     * @var WireSubscriberInterface[]
33
     */
34
    private $subscribers = [];
35
36
    /**
37
     * @var HeartbeatInterface
38
     */
39
    private $heartbeat;
40
41
    /**
42
     * @var int
43
     */
44
    private $frameMax;
45
46
    /**
47
     * @var string
48
     */
49
    private $buffer;
50
51
    /**
52
     * @param IOInterface $io
53
     */
54 33
    public function __construct(IOInterface $io)
55
    {
56 33
        $this->io = $io;
57 33
        $this->logger = new NullLogger();
58 33
        $this->heartbeat = new NullHeartbeat();
59 33
    }
60
61
    /**
62
     * {@inheritdoc}
63
     */
64 19
    public function open(Url $url)
65
    {
66 19
        $this->subscribers = [];
67 19
        $this->buffer = '';
68
69 19
        $this->io->open(
70 19
            $this->getProtocolForScheme($url),
71 19
            $url->getHost(),
72 19
            $url->getPort(),
73 19
            $url->getQuery()
74 19
        );
75
76 19
        $this->io->write(self::PROTOCOL_HEADER);
77
78
        // @todo: peek next 8 bytes and check if its a frame or "wrong protocol" reply
79
80 19
        return $this;
81
    }
82
83
    /**
84
     * {@inheritdoc}
85
     */
86 28
    public function next($blocking = true)
87
    {
88 28
        if ($this->heartbeat->shouldSendHeartbeat()) {
89 1
            $this->send(new Heartbeat(0));
90 1
        }
91
92 28
        if ($frame = $this->tryNext()) {
93 23
            $this->heartbeat->serverBeat();
94 23
            $this->dispatch($frame);
95 23
        } else {
96 28
            $this->read($blocking);
97
        }
98
99 28
        return $frame;
100
    }
101
102
    /**
103
     * Read next chunk of the data from the buffer.
104
     *
105
     * @param bool $blocking
106
     */
107 28
    private function read($blocking)
108
    {
109 28
        $this->buffer .= $this->io->read($this->frameMax ?: 8192, $blocking);
110 28
    }
111
112
    /**
113
     * Tries to fetch next frame from reading buffer.
114
     * Will return a Frame or null if there is not enough data in the buffer.
115
     *
116
     * @return Frame|null
117
     *
118
     * @throws InvalidFrameEndingException
119
     */
120 28
    private function tryNext()
121
    {
122 28
        $length = strlen($this->buffer);
123
124 28
        if ($length < 7) {
125 28
            return null;
126
        }
127
128 24
        $header = unpack('Ctype/nchannel/Nsize', $this->buffer);
129 24
        $size = $header['size'] + 8;
130
131 24
        if ($length < $size) {
132 1
            return null;
133
        }
134
135 24
        if ($length == $size) {
136 23
            $buffer = $this->buffer;
137 23
            $this->buffer = '';
138 23
        } else {
139 14
            $buffer = substr($this->buffer, 0, $size);
140 14
            $this->buffer = substr($this->buffer, $size, $length - $size);
141
        }
142
143 24
        $end = $buffer[$size - 1];
144
145 24
        if ($end != self::FRAME_ENDING) {
146 1
            throw new InvalidFrameEndingException(sprintf('Invalid frame ending (%d)', Binary::unpack('c', $end)));
147
        }
148
149 23
        return Frame::decode(new Buffer($buffer));
150
    }
151
152
    /**
153
     * @param Frame $frame
154
     */
155 23
    private function dispatch(Frame $frame)
156
    {
157 23
        if ($subscriber = $this->getSubscriber($frame->getChannel())) {
158 19
            $subscriber->dispatch($frame);
159 19
        }
160 23
    }
161
162
    /**
163
     * {@inheritdoc}
164
     */
165 22
    public function send(Frame $frame)
166
    {
167 22
        $this->heartbeat->clientBeat();
168
169 22
        foreach ($this->chop($frame) as $piece) {
170 22
            $this->io->write($piece->encode());
171 22
        }
172
173 22
        return $this;
174
    }
175
176
    /**
177
     * @param Frame $frame
178
     *
179
     * @return array
180
     */
181 22
    private function chop(Frame $frame)
182
    {
183 22
        if (!$this->frameMax || !$frame instanceof Content) {
184 21
            return [$frame];
185
        }
186
187 15
        $frames = [];
188 15
        $data = $frame->getData();
189 15
        $size = $this->frameMax - 8;
190 15
        $chunks = ceil(strlen($data) / $size);
191
192 15
        for ($c = 0; $c < $chunks; ++$c) {
193 8
            $frames[] = new Content($frame->getChannel(), substr($data, $c * $size, $size));
194 8
        }
195
196 15
        return $frames;
197
    }
198
199
    /**
200
     * {@inheritdoc}
201
     */
202 20
    public function wait($channel, $types)
203
    {
204 20
        if (!is_array($types)) {
205 19
            $types = [$types];
206 19
        }
207
208
        do {
209 20
            $frame = $this->next(true);
210 20
        } while (!$this->isFrameMatch($frame, $channel, $types));
211
212 19
        return $frame;
213
    }
214
215
    /**
216
     * @param Frame $frame
217
     * @param int   $channel
218
     * @param array $types
219
     *
220
     * @return bool
221
     */
222 20
    private function isFrameMatch(Frame $frame = null, $channel = 0, array $types = [])
223
    {
224 20
        return $frame && $frame->getChannel() == $channel && in_array(get_class($frame), $types);
225
    }
226
227
    /**
228
     * {@inheritdoc}
229
     */
230 19
    public function subscribe($channel, WireSubscriberInterface $handler)
231
    {
232 19
        $this->subscribers[$channel] = $handler;
233
234 19
        return $this;
235
    }
236
237
    /**
238
     * {@inheritdoc}
239
     */
240 19
    public function close()
241
    {
242 19
        $this->io->close();
243
244 19
        return $this;
245
    }
246
247
    /**
248
     * @param HeartbeatInterface $heartbeat
249
     *
250
     * @return $this
251
     */
252 20
    public function setHeartbeat(HeartbeatInterface $heartbeat)
253
    {
254 20
        $this->heartbeat = $heartbeat;
255
256 20
        return $this;
257
    }
258
259
    /**
260
     * @param int $frameMax
261
     *
262
     * @return $this
263
     */
264 18
    public function setFrameMax($frameMax)
265
    {
266 18
        $this->frameMax = $frameMax;
267
268 18
        return $this;
269
    }
270
271
    /**
272
     * @param int $channel
273
     *
274
     * @return WireSubscriberInterface|null
275
     */
276 23
    private function getSubscriber($channel)
277
    {
278 23
        return isset($this->subscribers[$channel]) ? $this->subscribers[$channel] : null;
279
    }
280
281
    /**
282
     * @param Url $url
283
     *
284
     * @return string
285
     */
286 19
    private function getProtocolForScheme(Url $url)
287
    {
288 19
        return strcasecmp($url->getScheme(), 'amqps') == 0 ? 'ssl' : 'tcp';
289
    }
290
}
291