Completed
Push — master ( 9eba40...22ad49 )
by Sergey
03:54
created

Wire::setFrameMax()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 6
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 6
ccs 3
cts 3
cp 1
rs 9.4285
c 0
b 0
f 0
cc 1
eloc 3
nc 1
nop 1
crap 1
1
<?php
2
3
namespace ButterAMQP\AMQP091;
4
5
use ButterAMQP\Binary;
6
use ButterAMQP\Buffer;
7
use ButterAMQP\Exception\InvalidFrameEndingException;
8
use ButterAMQP\AMQP091\Framing\Content;
9
use ButterAMQP\AMQP091\Framing\Frame;
10
use ButterAMQP\AMQP091\Framing\Heartbeat;
11
use ButterAMQP\Heartbeat\NullHeartbeat;
12
use ButterAMQP\HeartbeatInterface;
13
use ButterAMQP\IOInterface;
14
use ButterAMQP\Url;
15
use Psr\Log\LoggerAwareInterface;
16
use Psr\Log\LoggerAwareTrait;
17
use Psr\Log\NullLogger;
18
19
class Wire implements WireInterface, LoggerAwareInterface
20
{
21
    const PROTOCOL_HEADER = "AMQP\x00\x00\x09\x01";
22
    const FRAME_ENDING = "\xCE";
23
24
    use LoggerAwareTrait;
25
26
    /**
27
     * @var IOInterface
28
     */
29
    private $io;
30
31
    /**
32
     * @var WireSubscriberInterface[]
33
     */
34
    private $subscribers = [];
35
36
    /**
37
     * @var HeartbeatInterface
38
     */
39
    private $heartbeat;
40
41
    /**
42
     * @var int
43
     */
44
    private $frameMax;
45
46
    /**
47
     * @param IOInterface $io
48
     */
49 33
    public function __construct(IOInterface $io)
50
    {
51 33
        $this->io = $io;
52 33
        $this->logger = new NullLogger();
53 33
        $this->heartbeat = new NullHeartbeat();
54 33
    }
55
56
    /**
57
     * {@inheritdoc}
58
     */
59 19
    public function open(Url $url)
60
    {
61 19
        $this->subscribers = [];
62
63 19
        $this->io->open(
64 19
            $this->getProtocolForScheme($url),
65 19
            $url->getHost(),
66 19
            $url->getPort(),
67 19
            $url->getQuery()
68 19
        );
69
70 19
        $this->io->write(self::PROTOCOL_HEADER);
71
72
        // @todo: peek next 8 bytes and check if its a frame or "wrong protocol" reply
73
74 19
        return $this;
75
    }
76
77
    /**
78
     * {@inheritdoc}
79
     */
80 28
    public function next($blocking = true)
81
    {
82 28
        if ($this->heartbeat->shouldSendHeartbeat()) {
83 1
            $this->send(new Heartbeat(0));
84 1
        }
85
86 28
        if (($peek = $this->io->peek(7, $blocking)) === null) {
87 2
            return null;
88
        }
89
90 26
        $header = unpack('Ctype/nchannel/Nsize', $peek);
91
92 26
        if (($data = $this->io->read($header['size'] + 8, $blocking)) === null) {
93 3
            return null;
94
        }
95
96 24
        $end = $data[strlen($data) - 1];
97
98 24
        if ($end != self::FRAME_ENDING) {
99 1
            throw new InvalidFrameEndingException(sprintf('Invalid frame ending (%d)', Binary::unpack('c', $end)));
100
        }
101
102 23
        $frame = Frame::decode(new Buffer($data));
103
104 23
        $this->dispatch($frame);
105
106 23
        $this->heartbeat->serverBeat();
107
108 23
        return $frame;
109
    }
110
111
    /**
112
     * @param Frame $frame
113
     */
114 23
    private function dispatch(Frame $frame)
115
    {
116 23
        if ($subscriber = $this->getSubscriber($frame->getChannel())) {
117 19
            $subscriber->dispatch($frame);
118 19
        }
119 23
    }
120
121
    /**
122
     * {@inheritdoc}
123
     */
124 22
    public function send(Frame $frame)
125
    {
126 22
        $this->heartbeat->clientBeat();
127
128 22
        foreach ($this->chop($frame) as $piece) {
129 22
            $this->io->write($piece->encode());
130 22
        }
131
132 22
        return $this;
133
    }
134
135
    /**
136
     * @param Frame $frame
137
     *
138
     * @return array
139
     */
140 22
    private function chop(Frame $frame)
141
    {
142 22
        if (!$this->frameMax || !$frame instanceof Content) {
143 21
            return [$frame];
144
        }
145
146 15
        $frames = [];
147 15
        $data = $frame->getData();
148 15
        $size = $this->frameMax - 8;
149 15
        $chunks = ceil(strlen($data) / $size);
150
151 15
        for ($c = 0; $c < $chunks; ++$c) {
152 8
            $frames[] = new Content($frame->getChannel(), substr($data, $c * $size, $size));
153 8
        }
154
155 15
        return $frames;
156
    }
157
158
    /**
159
     * {@inheritdoc}
160
     */
161 20
    public function wait($channel, $types)
162
    {
163 20
        if (!is_array($types)) {
164 19
            $types = [$types];
165 19
        }
166
167
        do {
168 20
            $frame = $this->next(true);
169 20
        } while (!$this->isFrameMatch($frame, $channel, $types));
170
171 19
        return $frame;
172
    }
173
174
    /**
175
     * @param Frame $frame
176
     * @param int   $channel
177
     * @param array $types
178
     *
179
     * @return bool
180
     */
181 20
    private function isFrameMatch(Frame $frame = null, $channel = 0, array $types = [])
182
    {
183 20
        return $frame && $frame->getChannel() == $channel && in_array(get_class($frame), $types);
184
    }
185
186
    /**
187
     * {@inheritdoc}
188
     */
189 19
    public function subscribe($channel, WireSubscriberInterface $handler)
190
    {
191 19
        $this->subscribers[$channel] = $handler;
192
193 19
        return $this;
194
    }
195
196
    /**
197
     * {@inheritdoc}
198
     */
199 8
    public function close()
200
    {
201 8
        $this->io->close();
202
203 8
        return $this;
204
    }
205
206
    /**
207
     * @param HeartbeatInterface $heartbeat
208
     *
209
     * @return $this
210
     */
211 20
    public function setHeartbeat(HeartbeatInterface $heartbeat)
212
    {
213 20
        $this->heartbeat = $heartbeat;
214
215 20
        return $this;
216 1
    }
217
218
    /**
219
     * @param int $frameMax
220
     *
221
     * @return $this
222
     */
223 18
    public function setFrameMax($frameMax)
224
    {
225 18
        $this->frameMax = $frameMax;
226
227 18
        return $this;
228
    }
229
230
    /**
231
     * @param int $channel
232
     *
233
     * @return WireSubscriberInterface|null
234
     */
235 23
    private function getSubscriber($channel)
236
    {
237 23
        return isset($this->subscribers[$channel]) ? $this->subscribers[$channel] : null;
238
    }
239
240
    /**
241
     * @param Url $url
242
     *
243
     * @return string
244
     */
245 19
    private function getProtocolForScheme(Url $url)
246
    {
247 19
        return strcasecmp($url->getScheme(), 'amqps') == 0 ? 'ssl' : 'tcp';
248
    }
249
}
250