1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace ButterAMQP; |
4
|
|
|
|
5
|
|
|
use ButterAMQP\Exception\AMQPException; |
6
|
|
|
use ButterAMQP\Exception\InvalidChannelNumberException; |
7
|
|
|
use ButterAMQP\Framing\Frame; |
8
|
|
|
use ButterAMQP\Framing\Heartbeat; |
9
|
|
|
use ButterAMQP\Framing\Method\ConnectionBlocked; |
10
|
|
|
use ButterAMQP\Framing\Method\ConnectionClose; |
11
|
|
|
use ButterAMQP\Framing\Method\ConnectionCloseOk; |
12
|
|
|
use ButterAMQP\Framing\Method\ConnectionOpen; |
13
|
|
|
use ButterAMQP\Framing\Method\ConnectionOpenOk; |
14
|
|
|
use ButterAMQP\Framing\Method\ConnectionStart; |
15
|
|
|
use ButterAMQP\Framing\Method\ConnectionStartOk; |
16
|
|
|
use ButterAMQP\Framing\Method\ConnectionTune; |
17
|
|
|
use ButterAMQP\Framing\Method\ConnectionTuneOk; |
18
|
|
|
use ButterAMQP\Framing\Method\ConnectionUnblocked; |
19
|
|
|
use ButterAMQP\Heartbeat\TimeHeartbeat; |
20
|
|
|
use ButterAMQP\Security\Authenticator; |
21
|
|
|
use ButterAMQP\Security\AuthenticatorInterface; |
22
|
|
|
use Psr\Log\LoggerAwareInterface; |
23
|
|
|
use Psr\Log\LoggerAwareTrait; |
24
|
|
|
use Psr\Log\NullLogger; |
25
|
|
|
|
26
|
|
|
class Connection implements ConnectionInterface, WireSubscriberInterface, LoggerAwareInterface |
27
|
|
|
{ |
28
|
|
|
use LoggerAwareTrait; |
29
|
|
|
|
30
|
|
|
const STATUS_CLOSED = 0; |
31
|
|
|
const STATUS_READY = 1; |
32
|
|
|
const STATUS_BLOCKED = 2; |
33
|
|
|
|
34
|
|
|
/** |
35
|
|
|
* @var Url |
36
|
|
|
*/ |
37
|
|
|
private $url; |
38
|
|
|
|
39
|
|
|
/** |
40
|
|
|
* @var WireInterface |
41
|
|
|
*/ |
42
|
|
|
private $wire; |
43
|
|
|
|
44
|
|
|
/** |
45
|
|
|
* @var AuthenticatorInterface |
46
|
|
|
*/ |
47
|
|
|
private $authenticator; |
48
|
|
|
|
49
|
|
|
/** |
50
|
|
|
* @var string |
51
|
|
|
*/ |
52
|
|
|
private $status; |
53
|
|
|
|
54
|
|
|
/** |
55
|
|
|
* @var Channel[] |
56
|
|
|
*/ |
57
|
|
|
private $channels = []; |
58
|
|
|
|
59
|
|
|
/** |
60
|
|
|
* @var array |
61
|
|
|
*/ |
62
|
|
|
private $capabilities = []; |
63
|
|
|
|
64
|
|
|
/** |
65
|
|
|
* @param Url|string $url |
66
|
|
|
* @param WireInterface $wire |
67
|
|
|
* @param AuthenticatorInterface $authenticator |
68
|
|
|
*/ |
69
|
31 |
|
public function __construct(Url $url, WireInterface $wire, AuthenticatorInterface $authenticator = null) |
70
|
|
|
{ |
71
|
31 |
|
$this->url = $url; |
72
|
31 |
|
$this->wire = $wire; |
73
|
31 |
|
$this->authenticator = $authenticator ?: Authenticator::build(); |
74
|
31 |
|
$this->logger = new NullLogger(); |
75
|
31 |
|
} |
76
|
|
|
|
77
|
|
|
/** |
78
|
|
|
* Connection status. See STATUS_* constants for possible values. |
79
|
|
|
* |
80
|
|
|
* @return string |
81
|
|
|
*/ |
82
|
4 |
|
public function getStatus() |
83
|
|
|
{ |
84
|
4 |
|
return $this->status; |
85
|
|
|
} |
86
|
|
|
|
87
|
|
|
/** |
88
|
|
|
* {@inheritdoc} |
89
|
|
|
*/ |
90
|
19 |
|
public function open() |
91
|
|
|
{ |
92
|
19 |
|
$this->channels = []; |
93
|
19 |
|
$this->capabilities = []; |
94
|
|
|
|
95
|
19 |
|
$this->wire->open($this->url) |
96
|
19 |
|
->subscribe(0, $this); |
97
|
|
|
|
98
|
19 |
|
$this->wait(ConnectionTune::class); |
99
|
|
|
|
100
|
|
|
//$this->logger->debug(sprintf('Opening virtual host "%s"', $this->url->getVhost())); |
|
|
|
|
101
|
|
|
|
102
|
18 |
|
$this->send(new ConnectionOpen(0, $this->url->getVhost(), '', false)) |
103
|
18 |
|
->wait(ConnectionOpenOk::class); |
104
|
|
|
|
105
|
18 |
|
$this->status = self::STATUS_READY; |
|
|
|
|
106
|
|
|
|
107
|
18 |
|
return $this; |
108
|
|
|
} |
109
|
|
|
|
110
|
|
|
/** |
111
|
|
|
* {@inheritdoc} |
112
|
|
|
*/ |
113
|
19 |
|
public function channel($id = null) |
114
|
|
|
{ |
115
|
19 |
|
if ($id === null) { |
116
|
18 |
|
$id = count($this->channels) == 0 ? 1 : max(array_keys($this->channels)) + 1; |
117
|
18 |
|
} |
118
|
|
|
|
119
|
19 |
|
if (!is_integer($id) || $id <= 0) { |
120
|
1 |
|
throw new InvalidChannelNumberException('Channel ID should be positive integer'); |
121
|
|
|
} |
122
|
|
|
|
123
|
18 |
|
if (!isset($this->channels[$id])) { |
124
|
18 |
|
$channel = new Channel($this->wire, $id); |
125
|
|
|
|
126
|
18 |
|
if ($channel instanceof LoggerAwareInterface) { |
127
|
18 |
|
$channel->setLogger($this->logger); |
128
|
18 |
|
} |
129
|
|
|
|
130
|
18 |
|
$this->channels[$id] = $channel; |
131
|
|
|
|
132
|
18 |
|
$channel->open(); |
133
|
18 |
|
} |
134
|
|
|
|
135
|
18 |
|
return $this->channels[$id]; |
136
|
|
|
} |
137
|
|
|
|
138
|
|
|
/** |
139
|
|
|
* {@inheritdoc} |
140
|
|
|
*/ |
141
|
7 |
|
public function close($code = 0, $reason = '') |
142
|
|
|
{ |
143
|
7 |
|
$this->send(new ConnectionClose(0, $code, $reason, 0, 0)) |
144
|
7 |
|
->wait(ConnectionCloseOk::class); |
145
|
|
|
|
146
|
7 |
|
$this->status = self::STATUS_CLOSED; |
|
|
|
|
147
|
|
|
|
148
|
7 |
|
$this->wire->close(); |
149
|
|
|
|
150
|
7 |
|
return $this; |
151
|
|
|
} |
152
|
|
|
|
153
|
|
|
/** |
154
|
|
|
* {@inheritdoc} |
155
|
|
|
*/ |
156
|
1 |
|
public function isSupported($capability) |
157
|
|
|
{ |
158
|
1 |
|
return isset($this->capabilities[$capability]) ? |
159
|
1 |
|
(bool) $this->capabilities[$capability] : false; |
160
|
|
|
} |
161
|
|
|
|
162
|
|
|
/** |
163
|
|
|
* {@inheritdoc} |
164
|
|
|
*/ |
165
|
10 |
|
public function serve($blocking = true) |
166
|
|
|
{ |
167
|
10 |
|
$this->wire->next($blocking); |
168
|
|
|
|
169
|
8 |
|
return $this; |
170
|
|
|
} |
171
|
|
|
|
172
|
|
|
/** |
173
|
|
|
* Sends frame to the service channel (#0). |
174
|
|
|
* |
175
|
|
|
* @param Frame $frame |
176
|
|
|
* |
177
|
|
|
* @return $this |
178
|
|
|
*/ |
179
|
26 |
|
private function send(Frame $frame) |
180
|
|
|
{ |
181
|
26 |
|
$this->wire->send($frame); |
182
|
|
|
|
183
|
26 |
|
return $this; |
184
|
|
|
} |
185
|
|
|
|
186
|
|
|
/** |
187
|
|
|
* Wait for a frame in the service channel (#0). |
188
|
|
|
* |
189
|
|
|
* @param string|array $type |
190
|
|
|
* |
191
|
|
|
* @return Frame |
192
|
|
|
*/ |
193
|
20 |
|
private function wait($type) |
194
|
|
|
{ |
195
|
20 |
|
return $this->wire->wait(0, $type); |
196
|
|
|
} |
197
|
|
|
|
198
|
|
|
/** |
199
|
|
|
* {@inheritdoc} |
200
|
|
|
*/ |
201
|
26 |
|
public function dispatch(Frame $frame) |
202
|
|
|
{ |
203
|
26 |
|
if ($frame instanceof ConnectionStart) { |
|
|
|
|
204
|
20 |
|
$this->onConnectionStart($frame); |
205
|
20 |
|
} |
206
|
|
|
|
207
|
26 |
|
if ($frame instanceof ConnectionTune) { |
|
|
|
|
208
|
19 |
|
$this->onConnectionTune($frame); |
209
|
19 |
|
} |
210
|
|
|
|
211
|
26 |
|
if ($frame instanceof ConnectionClose) { |
|
|
|
|
212
|
3 |
|
$this->onConnectionClose($frame); |
213
|
1 |
|
} |
214
|
|
|
|
215
|
25 |
|
if ($frame instanceof ConnectionBlocked) { |
|
|
|
|
216
|
1 |
|
$this->onConnectionBlocked($frame); |
217
|
1 |
|
} |
218
|
|
|
|
219
|
25 |
|
if ($frame instanceof ConnectionUnblocked) { |
|
|
|
|
220
|
1 |
|
$this->onConnectionUnblocked($frame); |
221
|
1 |
|
} |
222
|
25 |
|
} |
223
|
|
|
|
224
|
|
|
/** |
225
|
|
|
* This frame is the first frame received from server. |
226
|
|
|
* It provides server details and requests client credentials. |
227
|
|
|
* |
228
|
|
|
* @param ConnectionStart $frame |
229
|
|
|
*/ |
230
|
20 |
|
private function onConnectionStart(ConnectionStart $frame) |
231
|
|
|
{ |
232
|
20 |
|
$properties = $frame->getServerProperties(); |
233
|
|
|
|
234
|
20 |
|
$this->capabilities = isset($properties['capabilities']) ? |
235
|
20 |
|
$properties['capabilities'] : []; |
236
|
|
|
|
237
|
20 |
|
$mechanism = $this->authenticator |
238
|
20 |
|
->get(explode(' ', $frame->getMechanisms())); |
239
|
|
|
|
240
|
20 |
|
list($locale) = explode(' ', $frame->getLocales()); |
241
|
|
|
|
242
|
20 |
|
$this->send(new ConnectionStartOk( |
243
|
20 |
|
0, |
244
|
|
|
[ |
245
|
20 |
|
'platform' => 'PHP '.PHP_VERSION, |
246
|
20 |
|
'product' => 'ButterAMQP', |
247
|
20 |
|
'version' => '0.1.0', |
248
|
|
|
'capabilities' => [ |
249
|
20 |
|
'publisher_confirms' => true, |
250
|
20 |
|
'exchange_exchange_bindings' => true, |
251
|
20 |
|
'basic.nack' => true, |
252
|
20 |
|
'connection.blocked' => true, |
253
|
20 |
|
'consumer_cancel_notify' => true, |
254
|
20 |
|
'authentication_failure_close' => true, |
255
|
20 |
|
], |
256
|
20 |
|
], |
257
|
20 |
|
$mechanism->getName(), |
258
|
20 |
|
$mechanism->getResponse($this->url->getUser(), $this->url->getPass()), |
259
|
|
|
$locale |
260
|
20 |
|
)); |
261
|
20 |
|
} |
262
|
|
|
|
263
|
|
|
/** |
264
|
|
|
* This frame is received to setup connection preferences, like max frame size, |
265
|
|
|
* max number of channel and heartbeat delay. |
266
|
|
|
* |
267
|
|
|
* Values in the request can be lowered by client. |
268
|
|
|
* |
269
|
|
|
* @param ConnectionTune $frame |
270
|
|
|
*/ |
271
|
|
|
private function onConnectionTune(ConnectionTune $frame) |
272
|
|
|
{ |
273
|
19 |
|
$negotiate = function ($a, $b) { |
274
|
19 |
|
return ($a * $b == 0) ? max($a, $b) : min($a, $b); |
275
|
19 |
|
}; |
276
|
|
|
|
277
|
19 |
|
$channelMax = $negotiate($this->url->getQueryParameter('channel_max', 0), $frame->getChannelMax()); |
278
|
19 |
|
$frameMax = $negotiate($this->url->getQueryParameter('frame_max', 0), $frame->getFrameMax()); |
279
|
19 |
|
$heartbeat = $negotiate($this->url->getQueryParameter('heartbeat', 60), $frame->getHeartbeat()); |
280
|
|
|
|
281
|
|
|
//$this->logger->debug(sprintf( |
|
|
|
|
282
|
|
|
// 'Tune connection: up to %d channels, %d frame size, heartbeat every %d seconds', |
283
|
|
|
// $channelMax, |
284
|
|
|
// $frameMax, |
285
|
|
|
// $heartbeat |
286
|
|
|
//)); |
287
|
|
|
|
288
|
19 |
|
$this->send(new ConnectionTuneOk(0, $channelMax, $frameMax, $heartbeat)); |
289
|
|
|
|
290
|
19 |
|
$this->wire->setHeartbeat(new TimeHeartbeat($heartbeat)) |
291
|
19 |
|
->setFrameMax($frameMax); |
292
|
19 |
|
} |
293
|
|
|
|
294
|
|
|
/** |
295
|
|
|
* This frame is received once server decide to close connection, normally because an unrecoverable error occur. |
296
|
|
|
* |
297
|
|
|
* @param ConnectionClose $frame |
298
|
|
|
* |
299
|
|
|
* @throws AMQPException |
300
|
|
|
*/ |
301
|
3 |
|
private function onConnectionClose(ConnectionClose $frame) |
302
|
|
|
{ |
303
|
3 |
|
$this->send(new ConnectionCloseOk(0)); |
304
|
3 |
|
$this->wire->close(); |
305
|
|
|
|
306
|
3 |
|
$this->status = self::STATUS_CLOSED; |
|
|
|
|
307
|
|
|
|
308
|
3 |
|
if ($frame->getReplyCode()) { |
309
|
2 |
|
throw AMQPException::make($frame->getReplyText(), $frame->getReplyCode()); |
310
|
|
|
} |
311
|
1 |
|
} |
312
|
|
|
|
313
|
|
|
/** |
314
|
|
|
* This frame is received once server decide to suspend connection, for example because server |
315
|
|
|
* run out of memory and can not provide service for the connection. When this happen consumer |
316
|
|
|
* suppose to suspend all activities until connection.unblocked is received. |
317
|
|
|
* |
318
|
|
|
* @param ConnectionBlocked $frame |
319
|
|
|
*/ |
320
|
1 |
|
private function onConnectionBlocked(ConnectionBlocked $frame) |
|
|
|
|
321
|
|
|
{ |
322
|
1 |
|
$this->status = self::STATUS_BLOCKED; |
|
|
|
|
323
|
1 |
|
} |
324
|
|
|
|
325
|
|
|
/** |
326
|
|
|
* This frame is received once connection returns back to normal state after being suspended. |
327
|
|
|
* See onConnectionBlocked above. |
328
|
|
|
* |
329
|
|
|
* @param ConnectionUnblocked $frame |
330
|
|
|
*/ |
331
|
1 |
|
private function onConnectionUnblocked(ConnectionUnblocked $frame) |
|
|
|
|
332
|
|
|
{ |
333
|
1 |
|
$this->status = self::STATUS_READY; |
|
|
|
|
334
|
1 |
|
} |
335
|
|
|
} |
336
|
|
|
|
Sometimes obsolete code just ends up commented out instead of removed. In this case it is better to remove the code once you have checked you do not need it.
The code might also have been commented out for debugging purposes. In this case it is vital that someone uncomments it again or your project may behave in very unexpected ways in production.
This check looks for comments that seem to be mostly valid code and reports them.