Completed
Push — master ( 73f6e1...42bf17 )
by Sergey
04:22
created

Wire   A

Complexity

Total Complexity 29

Size/Duplication

Total Lines 245
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 13

Test Coverage

Coverage 98.84%

Importance

Changes 5
Bugs 0 Features 2
Metric Value
wmc 29
c 5
b 0
f 2
lcom 1
cbo 13
dl 0
loc 245
ccs 85
cts 86
cp 0.9884
rs 10

12 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 6 1
A open() 0 17 1
A subscribe() 0 6 1
A close() 0 6 1
A setHeartbeat() 0 6 1
A setFrameMax() 0 6 1
A getSubscriber() 0 4 2
A getProtocolForScheme() 0 4 2
B next() 0 38 6
A send() 0 23 2
A chop() 0 17 4
C wait() 0 27 7
1
<?php
2
3
namespace ButterAMQP;
4
5
use ButterAMQP\Exception\InvalidFrameEndingException;
6
use ButterAMQP\Framing\Content;
7
use ButterAMQP\Framing\Frame;
8
use ButterAMQP\Framing\Heartbeat;
9
use ButterAMQP\Heartbeat\NullHeartbeat;
10
use ButterAMQP\Value\LongValue;
11
use ButterAMQP\Value\ShortValue;
12
use Psr\Log\LoggerAwareInterface;
13
use Psr\Log\LoggerAwareTrait;
14
use Psr\Log\NullLogger;
15
16
class Wire implements WireInterface, LoggerAwareInterface
17
{
18
    const PROTOCOL_HEADER = "AMQP\x00\x00\x09\x01";
19
    const FRAME_ENDING = "\xCE";
20
21
    use LoggerAwareTrait;
22
23
    /**
24
     * @var IOInterface
25
     */
26
    private $io;
27
28
    /**
29
     * @var WireSubscriberInterface[]
30
     */
31
    private $subscribers = [];
32
33
    /**
34
     * @var HeartbeatInterface
35
     */
36
    private $heartbeat;
37
38
    /**
39
     * @var int
40
     */
41
    private $frameMax;
42
43
    /**
44
     * @param IOInterface $io
45
     */
46 33
    public function __construct(IOInterface $io)
47
    {
48 33
        $this->io = $io;
49 33
        $this->logger = new NullLogger();
50 33
        $this->heartbeat = new NullHeartbeat();
51 33
    }
52
53
    /**
54
     * {@inheritdoc}
55
     */
56 20
    public function open(Url $url)
57
    {
58 19
        $this->subscribers = [];
59
60 19
        $this->io->open(
61 19
            $this->getProtocolForScheme($url),
62 19
            $url->getHost(),
63 19
            $url->getPort(),
64 19
            $url->getQuery()
65 19
        );
66
67 20
        $this->io->write(self::PROTOCOL_HEADER);
68
69
        // @todo: peek next 8 bytes and check if its a frame or "wrong protocol" reply
70
71 19
        return $this;
72
    }
73
74
    /**
75
     * {@inheritdoc}
76
     */
77 28
    public function next($blocking = true)
78
    {
79 28
        if ($this->heartbeat->shouldSendHeartbeat()) {
80 1
            $this->send(0, new Heartbeat());
81 1
        }
82
83 28
        if (($buffer = $this->io->peek(7, $blocking)) === null) {
84 2
            return null;
85
        }
86
87 26
        $header = unpack('Ctype/nchannel/Nsize', $buffer);
88
89 26
        if (($buffer = $this->io->read($header['size'] + 8, $blocking)) === null) {
90 3
            return null;
91
        }
92
93 24
        $payload = substr($buffer, 7, strlen($buffer) - 8);
94 24
        $end = $buffer[strlen($buffer) - 1];
95
96 24
        if ($end != self::FRAME_ENDING) {
97 1
            throw new InvalidFrameEndingException(sprintf('Invalid frame ending (%d)', Binary::unpack('c', $end)));
98
        }
99
100 23
        $frame = Frame::create($header['type'], $header['channel'], $payload);
101
102
        //$this->logger->debug(sprintf('Receive "%s" at channel #%d', get_class($frame), $frame->getChannel()), [
0 ignored issues
show
Unused Code Comprehensibility introduced by
70% of this comment could be valid code. Did you maybe forget this after debugging?

Sometimes obsolete code just ends up commented out instead of removed. In this case it is better to remove the code once you have checked you do not need it.

The code might also have been commented out for debugging purposes. In this case it is vital that someone uncomments it again or your project may behave in very unexpected ways in production.

This check looks for comments that seem to be mostly valid code and reports them.

Loading history...
103
        //    'channel' => $frame->getChannel(),
0 ignored issues
show
Unused Code Comprehensibility introduced by
64% of this comment could be valid code. Did you maybe forget this after debugging?

Sometimes obsolete code just ends up commented out instead of removed. In this case it is better to remove the code once you have checked you do not need it.

The code might also have been commented out for debugging purposes. In this case it is vital that someone uncomments it again or your project may behave in very unexpected ways in production.

This check looks for comments that seem to be mostly valid code and reports them.

Loading history...
104
        //    'frame' => get_class($frame),
0 ignored issues
show
Unused Code Comprehensibility introduced by
60% of this comment could be valid code. Did you maybe forget this after debugging?

Sometimes obsolete code just ends up commented out instead of removed. In this case it is better to remove the code once you have checked you do not need it.

The code might also have been commented out for debugging purposes. In this case it is vital that someone uncomments it again or your project may behave in very unexpected ways in production.

This check looks for comments that seem to be mostly valid code and reports them.

Loading history...
105
        //]);
106
107 23
        if ($subscriber = $this->getSubscriber($frame->getChannel())) {
108 19
            $subscriber->dispatch($frame);
109 19
        }
110
111 23
        $this->heartbeat->serverBeat();
112
113 23
        return $frame;
114
    }
115
116
    /**
117
     * {@inheritdoc}
118
     */
119 22
    public function send($channel, Frame $frame)
120
    {
121
        //$this->logger->debug(sprintf('Sending "%s" to channel #%d', get_class($frame), $channel), [
0 ignored issues
show
Unused Code Comprehensibility introduced by
69% of this comment could be valid code. Did you maybe forget this after debugging?

Sometimes obsolete code just ends up commented out instead of removed. In this case it is better to remove the code once you have checked you do not need it.

The code might also have been commented out for debugging purposes. In this case it is vital that someone uncomments it again or your project may behave in very unexpected ways in production.

This check looks for comments that seem to be mostly valid code and reports them.

Loading history...
122
        //    'channel' => $channel,
0 ignored issues
show
Unused Code Comprehensibility introduced by
58% of this comment could be valid code. Did you maybe forget this after debugging?

Sometimes obsolete code just ends up commented out instead of removed. In this case it is better to remove the code once you have checked you do not need it.

The code might also have been commented out for debugging purposes. In this case it is vital that someone uncomments it again or your project may behave in very unexpected ways in production.

This check looks for comments that seem to be mostly valid code and reports them.

Loading history...
123
        //    'frame' => get_class($frame),
0 ignored issues
show
Unused Code Comprehensibility introduced by
60% of this comment could be valid code. Did you maybe forget this after debugging?

Sometimes obsolete code just ends up commented out instead of removed. In this case it is better to remove the code once you have checked you do not need it.

The code might also have been commented out for debugging purposes. In this case it is vital that someone uncomments it again or your project may behave in very unexpected ways in production.

This check looks for comments that seem to be mostly valid code and reports them.

Loading history...
124
        //]);
125
126 22
        $this->heartbeat->clientBeat();
127
128 22
        foreach ($this->chop($frame) as $piece) {
129 22
            $data = $piece->encode();
130
131 22
            $this->io->write(
132 22
                $piece->getFrameType().
133 22
                ShortValue::encode($channel).
134 22
                LongValue::encode(strlen($data)).
135 22
                $data.
136
                self::FRAME_ENDING
137 22
            );
138 22
        }
139
140 22
        return $this;
141
    }
142
143
    /**
144
     * @param Frame $frame
145
     *
146
     * @return array
147
     */
148 22
    private function chop(Frame $frame)
149
    {
150 22
        if (!$this->frameMax || !$frame instanceof Content) {
151 21
            return [$frame];
152
        }
153
154 15
        $frames = [];
155 15
        $data = $frame->getData();
156 15
        $size = $this->frameMax - 8;
157 15
        $chunks = ceil(strlen($data) / $size);
158
159 15
        for ($c = 0; $c < $chunks; ++$c) {
160 8
            $frames[] = new Content(substr($data, $c * $size, $size));
161 8
        }
162
163 15
        return $frames;
164
    }
165
166
    /**
167
     * {@inheritdoc}
168
     */
169 20
    public function wait($channel, $types)
170
    {
171 20
        if (!is_array($types)) {
172 19
            $types = [$types];
173 19
        }
174
175
        //$this->logger->debug(sprintf('Waiting "%s" at channel #%d', implode('", "', $types), $channel), [
0 ignored issues
show
Unused Code Comprehensibility introduced by
68% of this comment could be valid code. Did you maybe forget this after debugging?

Sometimes obsolete code just ends up commented out instead of removed. In this case it is better to remove the code once you have checked you do not need it.

The code might also have been commented out for debugging purposes. In this case it is vital that someone uncomments it again or your project may behave in very unexpected ways in production.

This check looks for comments that seem to be mostly valid code and reports them.

Loading history...
176
        //    'channel' => $channel,
0 ignored issues
show
Unused Code Comprehensibility introduced by
58% of this comment could be valid code. Did you maybe forget this after debugging?

Sometimes obsolete code just ends up commented out instead of removed. In this case it is better to remove the code once you have checked you do not need it.

The code might also have been commented out for debugging purposes. In this case it is vital that someone uncomments it again or your project may behave in very unexpected ways in production.

This check looks for comments that seem to be mostly valid code and reports them.

Loading history...
177
        //    'frame' => $types,
0 ignored issues
show
Unused Code Comprehensibility introduced by
58% of this comment could be valid code. Did you maybe forget this after debugging?

Sometimes obsolete code just ends up commented out instead of removed. In this case it is better to remove the code once you have checked you do not need it.

The code might also have been commented out for debugging purposes. In this case it is vital that someone uncomments it again or your project may behave in very unexpected ways in production.

This check looks for comments that seem to be mostly valid code and reports them.

Loading history...
178
        //]);
179
180
        do {
181 20
            $frame = $this->next(true);
182
183 20
            if (!$frame || $frame->getChannel() != $channel) {
184 3
                continue;
185
            }
186
187 20
            foreach ($types as $type) {
188 20
                if (is_a($frame, $type)) {
189 19
                    return $frame;
190
                }
191 18
            }
192 20
        } while (true);
193
194
        return $frame;
195
    }
196
197
    /**
198
     * {@inheritdoc}
199
     */
200 19
    public function subscribe($channel, WireSubscriberInterface $handler)
201
    {
202 19
        $this->subscribers[$channel] = $handler;
203
204 19
        return $this;
205
    }
206
207
    /**
208
     * {@inheritdoc}
209
     */
210 8
    public function close()
211
    {
212 8
        $this->io->close();
213
214 8
        return $this;
215
    }
216
217
    /**
218
     * @param HeartbeatInterface $heartbeat
219
     *
220
     * @return $this
221
     */
222 20
    public function setHeartbeat(HeartbeatInterface $heartbeat)
223
    {
224 20
        $this->heartbeat = $heartbeat;
225
226 20
        return $this;
227
    }
228
229
    /**
230
     * @param int $frameMax
231
     *
232
     * @return $this
233
     */
234 18
    public function setFrameMax($frameMax)
235
    {
236 18
        $this->frameMax = $frameMax;
237
238 18
        return $this;
239
    }
240
241
    /**
242
     * @param int $channel
243
     *
244
     * @return WireSubscriberInterface|null
245
     */
246 23
    private function getSubscriber($channel)
247
    {
248 23
        return isset($this->subscribers[$channel]) ? $this->subscribers[$channel] : null;
249
    }
250
251
    /**
252
     * @param Url $url
253
     *
254
     * @return string
255
     */
256 19
    private function getProtocolForScheme(Url $url)
257
    {
258 19
        return strcasecmp($url->getScheme(), 'amqps') == 0 ? 'ssl' : 'tcp';
259
    }
260
}
261