1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace ButterAMQP; |
4
|
|
|
|
5
|
|
|
use ButterAMQP\Exception\InvalidFrameEndingException; |
6
|
|
|
use ButterAMQP\Framing\Content; |
7
|
|
|
use ButterAMQP\Framing\Frame; |
8
|
|
|
use ButterAMQP\Framing\Heartbeat; |
9
|
|
|
use ButterAMQP\Heartbeat\NullHeartbeat; |
10
|
|
|
use Psr\Log\LoggerAwareInterface; |
11
|
|
|
use Psr\Log\LoggerAwareTrait; |
12
|
|
|
use Psr\Log\NullLogger; |
13
|
|
|
|
14
|
|
|
class Wire implements WireInterface, LoggerAwareInterface |
15
|
|
|
{ |
16
|
|
|
const PROTOCOL_HEADER = "AMQP\x00\x00\x09\x01"; |
17
|
|
|
const FRAME_ENDING = "\xCE"; |
18
|
|
|
|
19
|
|
|
use LoggerAwareTrait; |
20
|
|
|
|
21
|
|
|
/** |
22
|
|
|
* @var IOInterface |
23
|
|
|
*/ |
24
|
|
|
private $io; |
25
|
|
|
|
26
|
|
|
/** |
27
|
|
|
* @var WireSubscriberInterface[] |
28
|
|
|
*/ |
29
|
|
|
private $subscribers = []; |
30
|
|
|
|
31
|
|
|
/** |
32
|
|
|
* @var HeartbeatInterface |
33
|
|
|
*/ |
34
|
|
|
private $heartbeat; |
35
|
|
|
|
36
|
|
|
/** |
37
|
|
|
* @var int |
38
|
|
|
*/ |
39
|
|
|
private $frameMax; |
40
|
|
|
|
41
|
|
|
/** |
42
|
|
|
* @param IOInterface $io |
43
|
|
|
*/ |
44
|
33 |
|
public function __construct(IOInterface $io) |
45
|
|
|
{ |
46
|
33 |
|
$this->io = $io; |
47
|
33 |
|
$this->logger = new NullLogger(); |
48
|
33 |
|
$this->heartbeat = new NullHeartbeat(); |
49
|
33 |
|
} |
50
|
|
|
|
51
|
|
|
/** |
52
|
|
|
* {@inheritdoc} |
53
|
|
|
*/ |
54
|
19 |
|
public function open(Url $url) |
55
|
|
|
{ |
56
|
19 |
|
$this->subscribers = []; |
57
|
|
|
|
58
|
19 |
|
$this->io->open( |
59
|
19 |
|
$this->getProtocolForScheme($url), |
60
|
19 |
|
$url->getHost(), |
61
|
19 |
|
$url->getPort(), |
62
|
19 |
|
$url->getQuery() |
63
|
19 |
|
); |
64
|
|
|
|
65
|
19 |
|
$this->io->write(self::PROTOCOL_HEADER); |
66
|
|
|
|
67
|
|
|
// @todo: peek next 8 bytes and check if its a frame or "wrong protocol" reply |
68
|
|
|
|
69
|
19 |
|
return $this; |
70
|
|
|
} |
71
|
|
|
|
72
|
|
|
/** |
73
|
|
|
* {@inheritdoc} |
74
|
|
|
*/ |
75
|
28 |
|
public function next($blocking = true) |
76
|
|
|
{ |
77
|
28 |
|
if ($this->heartbeat->shouldSendHeartbeat()) { |
78
|
1 |
|
$this->send(new Heartbeat(0)); |
79
|
1 |
|
} |
80
|
|
|
|
81
|
28 |
|
if (($peek = $this->io->peek(7, $blocking)) === null) { |
82
|
2 |
|
return null; |
83
|
|
|
} |
84
|
|
|
|
85
|
26 |
|
$header = unpack('Ctype/nchannel/Nsize', $peek); |
86
|
|
|
|
87
|
26 |
|
if (($data = $this->io->read($header['size'] + 8, $blocking)) === null) { |
88
|
3 |
|
return null; |
89
|
|
|
} |
90
|
|
|
|
91
|
24 |
|
$end = $data[strlen($data) - 1]; |
92
|
|
|
|
93
|
24 |
|
if ($end != self::FRAME_ENDING) { |
94
|
1 |
|
throw new InvalidFrameEndingException(sprintf('Invalid frame ending (%d)', Binary::unpack('c', $end))); |
95
|
|
|
} |
96
|
|
|
|
97
|
23 |
|
$frame = Frame::decode(new Buffer($data)); |
98
|
|
|
|
99
|
|
|
//$this->logger->debug(sprintf('Receive "%s" at channel #%d', get_class($frame), $frame->getChannel()), [ |
|
|
|
|
100
|
|
|
// 'channel' => $frame->getChannel(), |
|
|
|
|
101
|
|
|
// 'frame' => get_class($frame), |
|
|
|
|
102
|
|
|
//]); |
103
|
|
|
|
104
|
23 |
|
if ($subscriber = $this->getSubscriber($frame->getChannel())) { |
105
|
19 |
|
$subscriber->dispatch($frame); |
106
|
19 |
|
} |
107
|
|
|
|
108
|
23 |
|
$this->heartbeat->serverBeat(); |
109
|
|
|
|
110
|
23 |
|
return $frame; |
111
|
|
|
} |
112
|
|
|
|
113
|
|
|
/** |
114
|
|
|
* {@inheritdoc} |
115
|
|
|
*/ |
116
|
22 |
|
public function send(Frame $frame) |
117
|
|
|
{ |
118
|
|
|
//$this->logger->debug(sprintf('Sending "%s" to channel #%d', get_class($frame), $channel), [ |
|
|
|
|
119
|
|
|
// 'channel' => $channel, |
|
|
|
|
120
|
|
|
// 'frame' => get_class($frame), |
|
|
|
|
121
|
|
|
//]); |
122
|
|
|
|
123
|
22 |
|
$this->heartbeat->clientBeat(); |
124
|
|
|
|
125
|
22 |
|
foreach ($this->chop($frame) as $piece) { |
126
|
22 |
|
$this->io->write($piece->encode()); |
127
|
22 |
|
} |
128
|
|
|
|
129
|
22 |
|
return $this; |
130
|
|
|
} |
131
|
|
|
|
132
|
|
|
/** |
133
|
|
|
* @param Frame $frame |
134
|
|
|
* |
135
|
|
|
* @return array |
136
|
|
|
*/ |
137
|
22 |
|
private function chop(Frame $frame) |
138
|
|
|
{ |
139
|
22 |
|
if (!$this->frameMax || !$frame instanceof Content) { |
140
|
21 |
|
return [$frame]; |
141
|
|
|
} |
142
|
|
|
|
143
|
15 |
|
$frames = []; |
144
|
15 |
|
$data = $frame->getData(); |
145
|
15 |
|
$size = $this->frameMax - 8; |
146
|
15 |
|
$chunks = ceil(strlen($data) / $size); |
147
|
|
|
|
148
|
15 |
|
for ($c = 0; $c < $chunks; ++$c) { |
149
|
8 |
|
$frames[] = new Content($frame->getChannel(), substr($data, $c * $size, $size)); |
150
|
8 |
|
} |
151
|
|
|
|
152
|
15 |
|
return $frames; |
153
|
1 |
|
} |
154
|
|
|
|
155
|
|
|
/** |
156
|
|
|
* {@inheritdoc} |
157
|
|
|
*/ |
158
|
20 |
|
public function wait($channel, $types) |
159
|
|
|
{ |
160
|
20 |
|
if (!is_array($types)) { |
161
|
19 |
|
$types = [$types]; |
162
|
19 |
|
} |
163
|
|
|
|
164
|
|
|
//$this->logger->debug(sprintf('Waiting "%s" at channel #%d', implode('", "', $types), $channel), [ |
|
|
|
|
165
|
|
|
// 'channel' => $channel, |
|
|
|
|
166
|
|
|
// 'frame' => $types, |
|
|
|
|
167
|
|
|
//]); |
168
|
|
|
|
169
|
|
|
do { |
170
|
20 |
|
$frame = $this->next(true); |
171
|
|
|
|
172
|
20 |
|
if (!$frame || $frame->getChannel() != $channel) { |
173
|
3 |
|
continue; |
174
|
|
|
} |
175
|
|
|
|
176
|
20 |
|
foreach ($types as $type) { |
177
|
20 |
|
if (is_a($frame, $type)) { |
178
|
19 |
|
return $frame; |
179
|
|
|
} |
180
|
18 |
|
} |
181
|
20 |
|
} while (true); |
182
|
|
|
|
183
|
|
|
return $frame; |
184
|
|
|
} |
185
|
|
|
|
186
|
|
|
/** |
187
|
|
|
* {@inheritdoc} |
188
|
|
|
*/ |
189
|
19 |
|
public function subscribe($channel, WireSubscriberInterface $handler) |
190
|
|
|
{ |
191
|
19 |
|
$this->subscribers[$channel] = $handler; |
192
|
|
|
|
193
|
19 |
|
return $this; |
194
|
|
|
} |
195
|
|
|
|
196
|
|
|
/** |
197
|
|
|
* {@inheritdoc} |
198
|
|
|
*/ |
199
|
8 |
|
public function close() |
200
|
|
|
{ |
201
|
8 |
|
$this->io->close(); |
202
|
|
|
|
203
|
8 |
|
return $this; |
204
|
|
|
} |
205
|
|
|
|
206
|
|
|
/** |
207
|
|
|
* @param HeartbeatInterface $heartbeat |
208
|
|
|
* |
209
|
|
|
* @return $this |
210
|
|
|
*/ |
211
|
20 |
|
public function setHeartbeat(HeartbeatInterface $heartbeat) |
212
|
|
|
{ |
213
|
20 |
|
$this->heartbeat = $heartbeat; |
214
|
|
|
|
215
|
20 |
|
return $this; |
216
|
1 |
|
} |
217
|
|
|
|
218
|
|
|
/** |
219
|
|
|
* @param int $frameMax |
220
|
|
|
* |
221
|
|
|
* @return $this |
222
|
|
|
*/ |
223
|
18 |
|
public function setFrameMax($frameMax) |
224
|
|
|
{ |
225
|
18 |
|
$this->frameMax = $frameMax; |
226
|
|
|
|
227
|
18 |
|
return $this; |
228
|
|
|
} |
229
|
|
|
|
230
|
|
|
/** |
231
|
|
|
* @param int $channel |
232
|
|
|
* |
233
|
|
|
* @return WireSubscriberInterface|null |
234
|
|
|
*/ |
235
|
23 |
|
private function getSubscriber($channel) |
236
|
|
|
{ |
237
|
23 |
|
return isset($this->subscribers[$channel]) ? $this->subscribers[$channel] : null; |
238
|
|
|
} |
239
|
|
|
|
240
|
|
|
/** |
241
|
|
|
* @param Url $url |
242
|
|
|
* |
243
|
|
|
* @return string |
244
|
|
|
*/ |
245
|
19 |
|
private function getProtocolForScheme(Url $url) |
246
|
|
|
{ |
247
|
19 |
|
return strcasecmp($url->getScheme(), 'amqps') == 0 ? 'ssl' : 'tcp'; |
248
|
|
|
} |
249
|
|
|
} |
250
|
|
|
|
Sometimes obsolete code just ends up commented out instead of removed. In this case it is better to remove the code once you have checked you do not need it.
The code might also have been commented out for debugging purposes. In this case it is vital that someone uncomments it again or your project may behave in very unexpected ways in production.
This check looks for comments that seem to be mostly valid code and reports them.