Completed
Push — master ( 79150a...2528a8 )
by Sergey
03:52
created

Wire::__construct()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 6
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 6
ccs 5
cts 5
cp 1
rs 9.4285
c 0
b 0
f 0
cc 1
eloc 4
nc 1
nop 1
crap 1
1
<?php
2
3
namespace ButterAMQP\AMQP091;
4
5
use ButterAMQP\Binary;
6
use ButterAMQP\Buffer;
7
use ButterAMQP\Exception\InvalidFrameEndingException;
8
use ButterAMQP\AMQP091\Framing\Content;
9
use ButterAMQP\AMQP091\Framing\Frame;
10
use ButterAMQP\AMQP091\Framing\Heartbeat;
11
use ButterAMQP\Heartbeat\NullHeartbeat;
12
use ButterAMQP\HeartbeatInterface;
13
use ButterAMQP\IOInterface;
14
use ButterAMQP\Url;
15
use Psr\Log\LoggerAwareInterface;
16
use Psr\Log\LoggerAwareTrait;
17
use Psr\Log\NullLogger;
18
19
class Wire implements WireInterface, LoggerAwareInterface
20
{
21
    const PROTOCOL_HEADER = "AMQP\x00\x00\x09\x01";
22
    const FRAME_ENDING = "\xCE";
23
24
    use LoggerAwareTrait;
25
26
    /**
27
     * @var IOInterface
28
     */
29
    private $io;
30
31
    /**
32
     * @var WireSubscriberInterface[]
33
     */
34
    private $subscribers = [];
35
36
    /**
37
     * @var HeartbeatInterface
38
     */
39
    private $heartbeat;
40
41
    /**
42
     * @var int
43
     */
44
    private $frameMax;
45
46
    /**
47
     * @param IOInterface $io
48
     */
49 33
    public function __construct(IOInterface $io)
50
    {
51 33
        $this->io = $io;
52 33
        $this->logger = new NullLogger();
53 33
        $this->heartbeat = new NullHeartbeat();
54 33
    }
55
56
    /**
57
     * {@inheritdoc}
58
     */
59 19
    public function open(Url $url)
60
    {
61 19
        $this->subscribers = [];
62
63 19
        $this->io->open(
64 19
            $this->getProtocolForScheme($url),
65 19
            $url->getHost(),
66 19
            $url->getPort(),
67 19
            $url->getQuery()
68 19
        );
69
70 19
        $this->io->write(self::PROTOCOL_HEADER);
71
72
        // @todo: peek next 8 bytes and check if its a frame or "wrong protocol" reply
73
74 19
        return $this;
75
    }
76
77
    /**
78
     * {@inheritdoc}
79
     */
80 28
    public function next($blocking = true)
81
    {
82 28
        if ($this->heartbeat->shouldSendHeartbeat()) {
83 1
            $this->send(new Heartbeat(0));
84 1
        }
85
86 28
        if (($peek = $this->io->peek(7, $blocking)) === null) {
87 2
            return null;
88
        }
89
90 26
        $header = unpack('Ctype/nchannel/Nsize', $peek);
91
92 26
        if (($data = $this->io->read($header['size'] + 8, $blocking)) === null) {
93 3
            return null;
94
        }
95
96 24
        $end = $data[strlen($data) - 1];
97
98 24
        if ($end != self::FRAME_ENDING) {
99 1
            throw new InvalidFrameEndingException(sprintf('Invalid frame ending (%d)', Binary::unpack('c', $end)));
100
        }
101
102 23
        $frame = Frame::decode(new Buffer($data));
103
104 23
        $this->dispatch($frame);
105
106 23
        $this->heartbeat->serverBeat();
107
108 23
        return $frame;
109
    }
110
111
    /**
112
     * @param Frame $frame
113
     */
114 23
    private function dispatch(Frame $frame)
115
    {
116 23
        if ($subscriber = $this->getSubscriber($frame->getChannel())) {
117 19
            $subscriber->dispatch($frame);
118 19
        }
119 23
    }
120
121
    /**
122
     * {@inheritdoc}
123
     */
124 22
    public function send(Frame $frame)
125
    {
126 22
        $this->heartbeat->clientBeat();
127
128 22
        foreach ($this->chop($frame) as $piece) {
129 22
            $this->io->write($piece->encode());
130 22
        }
131
132 22
        return $this;
133
    }
134
135
    /**
136
     * @param Frame $frame
137
     *
138
     * @return array
139
     */
140 22
    private function chop(Frame $frame)
141
    {
142 22
        if (!$this->frameMax || !$frame instanceof Content) {
1 ignored issue
show
Bug introduced by
The class ButterAMQP\AMQP091\Framing\Content does not exist. Did you forget a USE statement, or did you not list all dependencies?

This error could be the result of:

1. Missing dependencies

PHP Analyzer uses your composer.json file (if available) to determine the dependencies of your project and to determine all the available classes and functions. It expects the composer.json to be in the root folder of your repository.

Are you sure this class is defined by one of your dependencies, or did you maybe not list a dependency in either the require or require-dev section?

2. Missing use statement

PHP does not complain about undefined classes in ìnstanceof checks. For example, the following PHP code will work perfectly fine:

if ($x instanceof DoesNotExist) {
    // Do something.
}

If you have not tested against this specific condition, such errors might go unnoticed.

Loading history...
143 21
            return [$frame];
144
        }
145
146 15
        $frames = [];
147 15
        $data = $frame->getData();
148 15
        $size = $this->frameMax - 8;
149 15
        $chunks = ceil(strlen($data) / $size);
150
151 15
        for ($c = 0; $c < $chunks; ++$c) {
152 8
            $frames[] = new Content($frame->getChannel(), substr($data, $c * $size, $size));
153 8
        }
154
155 15
        return $frames;
156
    }
157
158
    /**
159
     * {@inheritdoc}
160
     */
161 20
    public function wait($channel, $types)
162
    {
163 20
        if (!is_array($types)) {
164 19
            $types = [$types];
165 19
        }
166
167
        do {
168 20
            $frame = $this->next(true);
169
170 20
            if (!$frame || $frame->getChannel() != $channel) {
171 3
                continue;
172
            }
173
174 20
            if ($this->isFrameOneOf($frame, $types)) {
175 19
                break;
176
            }
177 20
        } while (true);
178
179 19
        return $frame;
180
    }
181
182
    /**
183
     * @param Frame $frame
184
     * @param array $types
185
     *
186
     * @return bool
187
     */
188 20
    private function isFrameOneOf(Frame $frame, array $types)
189
    {
190 20
        return in_array(get_class($frame), $types);
191
    }
192
193
    /**
194
     * {@inheritdoc}
195
     */
196 19
    public function subscribe($channel, WireSubscriberInterface $handler)
197
    {
198 19
        $this->subscribers[$channel] = $handler;
199
200 19
        return $this;
201
    }
202
203
    /**
204
     * {@inheritdoc}
205
     */
206 8
    public function close()
207
    {
208 8
        $this->io->close();
209
210 8
        return $this;
211
    }
212
213
    /**
214
     * @param HeartbeatInterface $heartbeat
215
     *
216
     * @return $this
217
     */
218 20
    public function setHeartbeat(HeartbeatInterface $heartbeat)
219
    {
220 20
        $this->heartbeat = $heartbeat;
221
222 20
        return $this;
223
    }
224
225
    /**
226
     * @param int $frameMax
227
     *
228
     * @return $this
229
     */
230 18
    public function setFrameMax($frameMax)
231
    {
232 18
        $this->frameMax = $frameMax;
233
234 18
        return $this;
235
    }
236
237
    /**
238
     * @param int $channel
239
     *
240
     * @return WireSubscriberInterface|null
241
     */
242 23
    private function getSubscriber($channel)
243
    {
244 23
        return isset($this->subscribers[$channel]) ? $this->subscribers[$channel] : null;
245
    }
246
247
    /**
248
     * @param Url $url
249
     *
250
     * @return string
251
     */
252 19
    private function getProtocolForScheme(Url $url)
253
    {
254 19
        return strcasecmp($url->getScheme(), 'amqps') == 0 ? 'ssl' : 'tcp';
255
    }
256
}
257