Completed
Push — master ( 42bf17...fdb7de )
by Sergey
04:25
created

Wire   A

Complexity

Total Complexity 29

Size/Duplication

Total Lines 236
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 12

Test Coverage

Coverage 98.77%

Importance

Changes 6
Bugs 0 Features 2
Metric Value
wmc 29
c 6
b 0
f 2
lcom 1
cbo 12
dl 0
loc 236
ccs 80
cts 81
cp 0.9877
rs 10

12 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 6 1
A open() 0 17 1
B next() 0 37 6
A send() 0 15 2
A chop() 0 17 4
C wait() 0 27 7
A subscribe() 0 6 1
A close() 0 6 1
A setHeartbeat() 0 6 1
A setFrameMax() 0 6 1
A getSubscriber() 0 4 2
A getProtocolForScheme() 0 4 2
1
<?php
2
3
namespace ButterAMQP;
4
5
use ButterAMQP\Exception\InvalidFrameEndingException;
6
use ButterAMQP\Framing\Content;
7
use ButterAMQP\Framing\Frame;
8
use ButterAMQP\Framing\Heartbeat;
9
use ButterAMQP\Heartbeat\NullHeartbeat;
10
use Psr\Log\LoggerAwareInterface;
11
use Psr\Log\LoggerAwareTrait;
12
use Psr\Log\NullLogger;
13
14
class Wire implements WireInterface, LoggerAwareInterface
15
{
16
    const PROTOCOL_HEADER = "AMQP\x00\x00\x09\x01";
17
    const FRAME_ENDING = "\xCE";
18
19
    use LoggerAwareTrait;
20
21
    /**
22
     * @var IOInterface
23
     */
24
    private $io;
25
26
    /**
27
     * @var WireSubscriberInterface[]
28
     */
29
    private $subscribers = [];
30
31
    /**
32
     * @var HeartbeatInterface
33
     */
34
    private $heartbeat;
35
36
    /**
37
     * @var int
38
     */
39
    private $frameMax;
40
41
    /**
42
     * @param IOInterface $io
43
     */
44 33
    public function __construct(IOInterface $io)
45
    {
46 33
        $this->io = $io;
47 33
        $this->logger = new NullLogger();
48 33
        $this->heartbeat = new NullHeartbeat();
49 33
    }
50
51
    /**
52
     * {@inheritdoc}
53
     */
54 19
    public function open(Url $url)
55
    {
56 19
        $this->subscribers = [];
57
58 19
        $this->io->open(
59 19
            $this->getProtocolForScheme($url),
60 19
            $url->getHost(),
61 19
            $url->getPort(),
62 19
            $url->getQuery()
63 19
        );
64
65 19
        $this->io->write(self::PROTOCOL_HEADER);
66
67
        // @todo: peek next 8 bytes and check if its a frame or "wrong protocol" reply
68
69 19
        return $this;
70
    }
71
72
    /**
73
     * {@inheritdoc}
74
     */
75 28
    public function next($blocking = true)
76
    {
77 28
        if ($this->heartbeat->shouldSendHeartbeat()) {
78 1
            $this->send(new Heartbeat(0));
79 1
        }
80
81 28
        if (($peek = $this->io->peek(7, $blocking)) === null) {
82 2
            return null;
83
        }
84
85 26
        $header = unpack('Ctype/nchannel/Nsize', $peek);
86
87 26
        if (($data = $this->io->read($header['size'] + 8, $blocking)) === null) {
88 3
            return null;
89
        }
90
91 24
        $end = $data[strlen($data) - 1];
92
93 24
        if ($end != self::FRAME_ENDING) {
94 1
            throw new InvalidFrameEndingException(sprintf('Invalid frame ending (%d)', Binary::unpack('c', $end)));
95
        }
96
97 23
        $frame = Frame::decode(new Buffer($data));
98
99
        //$this->logger->debug(sprintf('Receive "%s" at channel #%d', get_class($frame), $frame->getChannel()), [
0 ignored issues
show
Unused Code Comprehensibility introduced by
70% of this comment could be valid code. Did you maybe forget this after debugging?

Sometimes obsolete code just ends up commented out instead of removed. In this case it is better to remove the code once you have checked you do not need it.

The code might also have been commented out for debugging purposes. In this case it is vital that someone uncomments it again or your project may behave in very unexpected ways in production.

This check looks for comments that seem to be mostly valid code and reports them.

Loading history...
100
        //    'channel' => $frame->getChannel(),
0 ignored issues
show
Unused Code Comprehensibility introduced by
64% of this comment could be valid code. Did you maybe forget this after debugging?

Sometimes obsolete code just ends up commented out instead of removed. In this case it is better to remove the code once you have checked you do not need it.

The code might also have been commented out for debugging purposes. In this case it is vital that someone uncomments it again or your project may behave in very unexpected ways in production.

This check looks for comments that seem to be mostly valid code and reports them.

Loading history...
101
        //    'frame' => get_class($frame),
0 ignored issues
show
Unused Code Comprehensibility introduced by
60% of this comment could be valid code. Did you maybe forget this after debugging?

Sometimes obsolete code just ends up commented out instead of removed. In this case it is better to remove the code once you have checked you do not need it.

The code might also have been commented out for debugging purposes. In this case it is vital that someone uncomments it again or your project may behave in very unexpected ways in production.

This check looks for comments that seem to be mostly valid code and reports them.

Loading history...
102
        //]);
103
104 23
        if ($subscriber = $this->getSubscriber($frame->getChannel())) {
105 19
            $subscriber->dispatch($frame);
106 19
        }
107
108 23
        $this->heartbeat->serverBeat();
109
110 23
        return $frame;
111
    }
112
113
    /**
114
     * {@inheritdoc}
115
     */
116 22
    public function send(Frame $frame)
117
    {
118
        //$this->logger->debug(sprintf('Sending "%s" to channel #%d', get_class($frame), $channel), [
0 ignored issues
show
Unused Code Comprehensibility introduced by
69% of this comment could be valid code. Did you maybe forget this after debugging?

Sometimes obsolete code just ends up commented out instead of removed. In this case it is better to remove the code once you have checked you do not need it.

The code might also have been commented out for debugging purposes. In this case it is vital that someone uncomments it again or your project may behave in very unexpected ways in production.

This check looks for comments that seem to be mostly valid code and reports them.

Loading history...
119
        //    'channel' => $channel,
0 ignored issues
show
Unused Code Comprehensibility introduced by
58% of this comment could be valid code. Did you maybe forget this after debugging?

Sometimes obsolete code just ends up commented out instead of removed. In this case it is better to remove the code once you have checked you do not need it.

The code might also have been commented out for debugging purposes. In this case it is vital that someone uncomments it again or your project may behave in very unexpected ways in production.

This check looks for comments that seem to be mostly valid code and reports them.

Loading history...
120
        //    'frame' => get_class($frame),
0 ignored issues
show
Unused Code Comprehensibility introduced by
60% of this comment could be valid code. Did you maybe forget this after debugging?

Sometimes obsolete code just ends up commented out instead of removed. In this case it is better to remove the code once you have checked you do not need it.

The code might also have been commented out for debugging purposes. In this case it is vital that someone uncomments it again or your project may behave in very unexpected ways in production.

This check looks for comments that seem to be mostly valid code and reports them.

Loading history...
121
        //]);
122
123 22
        $this->heartbeat->clientBeat();
124
125 22
        foreach ($this->chop($frame) as $piece) {
126 22
            $this->io->write($piece->encode());
127 22
        }
128
129 22
        return $this;
130
    }
131
132
    /**
133
     * @param Frame $frame
134
     *
135
     * @return array
136
     */
137 22
    private function chop(Frame $frame)
138
    {
139 22
        if (!$this->frameMax || !$frame instanceof Content) {
140 21
            return [$frame];
141
        }
142
143 15
        $frames = [];
144 15
        $data = $frame->getData();
145 15
        $size = $this->frameMax - 8;
146 15
        $chunks = ceil(strlen($data) / $size);
147
148 15
        for ($c = 0; $c < $chunks; ++$c) {
149 8
            $frames[] = new Content($frame->getChannel(), substr($data, $c * $size, $size));
150 8
        }
151
152 15
        return $frames;
153 1
    }
154
155
    /**
156
     * {@inheritdoc}
157
     */
158 20
    public function wait($channel, $types)
159
    {
160 20
        if (!is_array($types)) {
161 19
            $types = [$types];
162 19
        }
163
164
        //$this->logger->debug(sprintf('Waiting "%s" at channel #%d', implode('", "', $types), $channel), [
0 ignored issues
show
Unused Code Comprehensibility introduced by
68% of this comment could be valid code. Did you maybe forget this after debugging?

Sometimes obsolete code just ends up commented out instead of removed. In this case it is better to remove the code once you have checked you do not need it.

The code might also have been commented out for debugging purposes. In this case it is vital that someone uncomments it again or your project may behave in very unexpected ways in production.

This check looks for comments that seem to be mostly valid code and reports them.

Loading history...
165
        //    'channel' => $channel,
0 ignored issues
show
Unused Code Comprehensibility introduced by
58% of this comment could be valid code. Did you maybe forget this after debugging?

Sometimes obsolete code just ends up commented out instead of removed. In this case it is better to remove the code once you have checked you do not need it.

The code might also have been commented out for debugging purposes. In this case it is vital that someone uncomments it again or your project may behave in very unexpected ways in production.

This check looks for comments that seem to be mostly valid code and reports them.

Loading history...
166
        //    'frame' => $types,
0 ignored issues
show
Unused Code Comprehensibility introduced by
58% of this comment could be valid code. Did you maybe forget this after debugging?

Sometimes obsolete code just ends up commented out instead of removed. In this case it is better to remove the code once you have checked you do not need it.

The code might also have been commented out for debugging purposes. In this case it is vital that someone uncomments it again or your project may behave in very unexpected ways in production.

This check looks for comments that seem to be mostly valid code and reports them.

Loading history...
167
        //]);
168
169
        do {
170 20
            $frame = $this->next(true);
171
172 20
            if (!$frame || $frame->getChannel() != $channel) {
173 3
                continue;
174
            }
175
176 20
            foreach ($types as $type) {
177 20
                if (is_a($frame, $type)) {
178 19
                    return $frame;
179
                }
180 18
            }
181 20
        } while (true);
182
183
        return $frame;
184
    }
185
186
    /**
187
     * {@inheritdoc}
188
     */
189 19
    public function subscribe($channel, WireSubscriberInterface $handler)
190
    {
191 19
        $this->subscribers[$channel] = $handler;
192
193 19
        return $this;
194
    }
195
196
    /**
197
     * {@inheritdoc}
198
     */
199 8
    public function close()
200
    {
201 8
        $this->io->close();
202
203 8
        return $this;
204
    }
205
206
    /**
207
     * @param HeartbeatInterface $heartbeat
208
     *
209
     * @return $this
210
     */
211 20
    public function setHeartbeat(HeartbeatInterface $heartbeat)
212
    {
213 20
        $this->heartbeat = $heartbeat;
214
215 20
        return $this;
216 1
    }
217
218
    /**
219
     * @param int $frameMax
220
     *
221
     * @return $this
222
     */
223 18
    public function setFrameMax($frameMax)
224
    {
225 18
        $this->frameMax = $frameMax;
226
227 18
        return $this;
228
    }
229
230
    /**
231
     * @param int $channel
232
     *
233
     * @return WireSubscriberInterface|null
234
     */
235 23
    private function getSubscriber($channel)
236
    {
237 23
        return isset($this->subscribers[$channel]) ? $this->subscribers[$channel] : null;
238
    }
239
240
    /**
241
     * @param Url $url
242
     *
243
     * @return string
244
     */
245 19
    private function getProtocolForScheme(Url $url)
246
    {
247 19
        return strcasecmp($url->getScheme(), 'amqps') == 0 ? 'ssl' : 'tcp';
248
    }
249
}
250