1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace ButterAMQP\AMQP091; |
4
|
|
|
|
5
|
|
|
use ButterAMQP\ConnectionInterface; |
6
|
|
|
use ButterAMQP\Exception\AMQPException; |
7
|
|
|
use ButterAMQP\Exception\InvalidChannelNumberException; |
8
|
|
|
use ButterAMQP\AMQP091\Framing\Frame; |
9
|
|
|
use ButterAMQP\AMQP091\Framing\Heartbeat; |
10
|
|
|
use ButterAMQP\AMQP091\Framing\Method\ConnectionBlocked; |
11
|
|
|
use ButterAMQP\AMQP091\Framing\Method\ConnectionClose; |
12
|
|
|
use ButterAMQP\AMQP091\Framing\Method\ConnectionCloseOk; |
13
|
|
|
use ButterAMQP\AMQP091\Framing\Method\ConnectionOpen; |
14
|
|
|
use ButterAMQP\AMQP091\Framing\Method\ConnectionOpenOk; |
15
|
|
|
use ButterAMQP\AMQP091\Framing\Method\ConnectionStart; |
16
|
|
|
use ButterAMQP\AMQP091\Framing\Method\ConnectionStartOk; |
17
|
|
|
use ButterAMQP\AMQP091\Framing\Method\ConnectionTune; |
18
|
|
|
use ButterAMQP\AMQP091\Framing\Method\ConnectionTuneOk; |
19
|
|
|
use ButterAMQP\AMQP091\Framing\Method\ConnectionUnblocked; |
20
|
|
|
use ButterAMQP\Heartbeat\TimeHeartbeat; |
21
|
|
|
use ButterAMQP\Security\Authenticator; |
22
|
|
|
use ButterAMQP\Security\AuthenticatorInterface; |
23
|
|
|
use ButterAMQP\Url; |
24
|
|
|
use ButterAMQP\WireInterface; |
25
|
|
|
use ButterAMQP\WireSubscriberInterface; |
26
|
|
|
|
27
|
|
|
class Connection implements ConnectionInterface, WireSubscriberInterface |
28
|
|
|
{ |
29
|
|
|
const STATUS_CLOSED = 'closed'; |
30
|
|
|
const STATUS_READY = 'ready'; |
31
|
|
|
const STATUS_BLOCKED = 'blocked'; |
32
|
|
|
|
33
|
|
|
/** |
34
|
|
|
* @var Url |
35
|
|
|
*/ |
36
|
|
|
private $url; |
37
|
|
|
|
38
|
|
|
/** |
39
|
|
|
* @var WireInterface |
40
|
|
|
*/ |
41
|
|
|
private $wire; |
42
|
|
|
|
43
|
|
|
/** |
44
|
|
|
* @var AuthenticatorInterface |
45
|
|
|
*/ |
46
|
|
|
private $authenticator; |
47
|
|
|
|
48
|
|
|
/** |
49
|
|
|
* @var string |
50
|
|
|
*/ |
51
|
|
|
private $status; |
52
|
|
|
|
53
|
|
|
/** |
54
|
|
|
* @var Channel[] |
55
|
|
|
*/ |
56
|
|
|
private $channels = []; |
57
|
|
|
|
58
|
|
|
/** |
59
|
|
|
* @var array |
60
|
|
|
*/ |
61
|
|
|
private $capabilities = []; |
62
|
|
|
|
63
|
|
|
/** |
64
|
|
|
* @param Url|string $url |
65
|
|
|
* @param WireInterface $wire |
66
|
|
|
* @param AuthenticatorInterface $authenticator |
67
|
|
|
*/ |
68
|
31 |
|
public function __construct(Url $url, WireInterface $wire, AuthenticatorInterface $authenticator = null) |
69
|
|
|
{ |
70
|
31 |
|
$this->url = $url; |
71
|
31 |
|
$this->wire = $wire; |
72
|
31 |
|
$this->authenticator = $authenticator ?: Authenticator::build(); |
73
|
31 |
|
} |
74
|
|
|
|
75
|
|
|
/** |
76
|
|
|
* Connection status. See STATUS_* constants for possible values. |
77
|
|
|
* |
78
|
|
|
* @return string |
79
|
|
|
*/ |
80
|
4 |
|
public function getStatus() |
81
|
|
|
{ |
82
|
4 |
|
return $this->status; |
83
|
|
|
} |
84
|
|
|
|
85
|
|
|
/** |
86
|
|
|
* {@inheritdoc} |
87
|
|
|
*/ |
88
|
19 |
|
public function open() |
89
|
|
|
{ |
90
|
19 |
|
$this->channels = []; |
91
|
19 |
|
$this->capabilities = []; |
92
|
|
|
|
93
|
19 |
|
$this->wire->open($this->url) |
94
|
19 |
|
->subscribe(0, $this); |
95
|
|
|
|
96
|
19 |
|
$this->wait(ConnectionTune::class); |
97
|
|
|
|
98
|
18 |
|
$this->send(new ConnectionOpen(0, $this->url->getVhost(), '', false)) |
99
|
18 |
|
->wait(ConnectionOpenOk::class); |
100
|
|
|
|
101
|
18 |
|
$this->status = self::STATUS_READY; |
102
|
|
|
|
103
|
18 |
|
return $this; |
104
|
|
|
} |
105
|
|
|
|
106
|
|
|
/** |
107
|
|
|
* {@inheritdoc} |
108
|
|
|
*/ |
109
|
19 |
|
public function channel($id = null) |
110
|
|
|
{ |
111
|
19 |
|
if ($id === null) { |
112
|
18 |
|
$id = count($this->channels) == 0 ? 1 : max(array_keys($this->channels)) + 1; |
113
|
18 |
|
} |
114
|
|
|
|
115
|
19 |
|
if (!is_integer($id) || $id <= 0) { |
116
|
1 |
|
throw new InvalidChannelNumberException('Channel ID should be positive integer'); |
117
|
|
|
} |
118
|
|
|
|
119
|
18 |
|
if (!isset($this->channels[$id])) { |
120
|
18 |
|
$channel = $this->channels[$id] = new Channel($this->wire, $id); |
121
|
18 |
|
$channel->open(); |
122
|
18 |
|
} |
123
|
|
|
|
124
|
18 |
|
return $this->channels[$id]; |
125
|
|
|
} |
126
|
|
|
|
127
|
|
|
/** |
128
|
|
|
* {@inheritdoc} |
129
|
|
|
*/ |
130
|
7 |
|
public function close($code = 0, $reason = '') |
131
|
|
|
{ |
132
|
7 |
|
$this->send(new ConnectionClose(0, $code, $reason, 0, 0)) |
133
|
7 |
|
->wait(ConnectionCloseOk::class); |
134
|
|
|
|
135
|
7 |
|
$this->status = self::STATUS_CLOSED; |
136
|
|
|
|
137
|
7 |
|
$this->wire->close(); |
138
|
|
|
|
139
|
7 |
|
return $this; |
140
|
|
|
} |
141
|
|
|
|
142
|
|
|
/** |
143
|
|
|
* {@inheritdoc} |
144
|
|
|
*/ |
145
|
1 |
|
public function isSupported($capability) |
146
|
|
|
{ |
147
|
1 |
|
return isset($this->capabilities[$capability]) ? |
148
|
1 |
|
(bool) $this->capabilities[$capability] : false; |
149
|
|
|
} |
150
|
|
|
|
151
|
|
|
/** |
152
|
|
|
* {@inheritdoc} |
153
|
|
|
*/ |
154
|
10 |
|
public function serve($blocking = true) |
155
|
|
|
{ |
156
|
10 |
|
$this->wire->next($blocking); |
157
|
|
|
|
158
|
8 |
|
return $this; |
159
|
|
|
} |
160
|
|
|
|
161
|
|
|
/** |
162
|
|
|
* Sends frame to the service channel (#0). |
163
|
|
|
* |
164
|
|
|
* @param Frame $frame |
165
|
|
|
* |
166
|
|
|
* @return $this |
167
|
|
|
*/ |
168
|
26 |
|
private function send(Frame $frame) |
169
|
|
|
{ |
170
|
26 |
|
$this->wire->send($frame); |
171
|
|
|
|
172
|
26 |
|
return $this; |
173
|
|
|
} |
174
|
|
|
|
175
|
|
|
/** |
176
|
|
|
* Wait for a frame in the service channel (#0). |
177
|
|
|
* |
178
|
|
|
* @param string|array $type |
179
|
|
|
* |
180
|
|
|
* @return Frame |
181
|
|
|
*/ |
182
|
20 |
|
private function wait($type) |
183
|
|
|
{ |
184
|
20 |
|
return $this->wire->wait(0, $type); |
185
|
|
|
} |
186
|
|
|
|
187
|
|
|
/** |
188
|
|
|
* {@inheritdoc} |
189
|
|
|
*/ |
190
|
26 |
|
public function dispatch(Frame $frame) |
191
|
|
|
{ |
192
|
26 |
|
if ($frame instanceof ConnectionStart) { |
|
|
|
|
193
|
20 |
|
$this->onConnectionStart($frame); |
194
|
26 |
|
} elseif ($frame instanceof ConnectionTune) { |
|
|
|
|
195
|
19 |
|
$this->onConnectionTune($frame); |
196
|
24 |
|
} elseif ($frame instanceof ConnectionClose) { |
|
|
|
|
197
|
3 |
|
$this->onConnectionClose($frame); |
198
|
20 |
|
} elseif ($frame instanceof ConnectionBlocked) { |
|
|
|
|
199
|
1 |
|
$this->onConnectionBlocked($frame); |
200
|
19 |
|
} elseif ($frame instanceof ConnectionUnblocked) { |
|
|
|
|
201
|
1 |
|
$this->onConnectionUnblocked($frame); |
202
|
1 |
|
} |
203
|
25 |
|
} |
204
|
|
|
|
205
|
|
|
/** |
206
|
|
|
* This frame is the first frame received from server. |
207
|
|
|
* It provides server details and requests client credentials. |
208
|
|
|
* |
209
|
|
|
* @param ConnectionStart $frame |
210
|
|
|
*/ |
211
|
20 |
|
private function onConnectionStart(ConnectionStart $frame) |
212
|
|
|
{ |
213
|
20 |
|
$properties = $frame->getServerProperties(); |
214
|
|
|
|
215
|
20 |
|
$this->capabilities = isset($properties['capabilities']) ? |
216
|
20 |
|
$properties['capabilities'] : []; |
217
|
|
|
|
218
|
20 |
|
$mechanism = $this->authenticator |
219
|
20 |
|
->get(explode(' ', $frame->getMechanisms())); |
220
|
|
|
|
221
|
20 |
|
list($locale) = explode(' ', $frame->getLocales()); |
222
|
|
|
|
223
|
20 |
|
$this->send(new ConnectionStartOk( |
224
|
20 |
|
0, |
225
|
|
|
[ |
226
|
20 |
|
'platform' => 'PHP '.PHP_VERSION, |
227
|
20 |
|
'product' => 'ButterAMQP', |
228
|
20 |
|
'version' => '0.1.0', |
229
|
|
|
'capabilities' => [ |
230
|
20 |
|
'publisher_confirms' => true, |
231
|
20 |
|
'exchange_exchange_bindings' => true, |
232
|
20 |
|
'basic.nack' => true, |
233
|
20 |
|
'connection.blocked' => true, |
234
|
20 |
|
'consumer_cancel_notify' => true, |
235
|
20 |
|
'authentication_failure_close' => true, |
236
|
20 |
|
], |
237
|
20 |
|
], |
238
|
20 |
|
$mechanism->getName(), |
239
|
20 |
|
$mechanism->getResponse($this->url->getUser(), $this->url->getPass()), |
240
|
|
|
$locale |
241
|
20 |
|
)); |
242
|
20 |
|
} |
243
|
|
|
|
244
|
|
|
/** |
245
|
|
|
* This frame is received to setup connection preferences, like max frame size, |
246
|
|
|
* max number of channel and heartbeat delay. |
247
|
|
|
* |
248
|
|
|
* Values in the request can be lowered by client. |
249
|
|
|
* |
250
|
|
|
* @param ConnectionTune $frame |
251
|
|
|
*/ |
252
|
|
|
private function onConnectionTune(ConnectionTune $frame) |
253
|
|
|
{ |
254
|
19 |
|
$negotiate = function ($a, $b) { |
255
|
19 |
|
return ($a * $b == 0) ? max($a, $b) : min($a, $b); |
256
|
19 |
|
}; |
257
|
|
|
|
258
|
19 |
|
$channelMax = $negotiate($this->url->getQueryParameter('channel_max', 0), $frame->getChannelMax()); |
259
|
19 |
|
$frameMax = $negotiate($this->url->getQueryParameter('frame_max', 0), $frame->getFrameMax()); |
260
|
19 |
|
$heartbeat = $negotiate($this->url->getQueryParameter('heartbeat', 60), $frame->getHeartbeat()); |
261
|
|
|
|
262
|
19 |
|
$this->send(new ConnectionTuneOk(0, $channelMax, $frameMax, $heartbeat)); |
263
|
|
|
|
264
|
19 |
|
$this->wire->setHeartbeat(new TimeHeartbeat($heartbeat)) |
265
|
19 |
|
->setFrameMax($frameMax); |
266
|
19 |
|
} |
267
|
|
|
|
268
|
|
|
/** |
269
|
|
|
* This frame is received once server decide to close connection, normally because an unrecoverable error occur. |
270
|
|
|
* |
271
|
|
|
* @param ConnectionClose $frame |
272
|
|
|
* |
273
|
|
|
* @throws AMQPException |
274
|
|
|
*/ |
275
|
3 |
|
private function onConnectionClose(ConnectionClose $frame) |
276
|
|
|
{ |
277
|
3 |
|
$this->send(new ConnectionCloseOk(0)); |
278
|
3 |
|
$this->wire->close(); |
279
|
|
|
|
280
|
3 |
|
$this->status = self::STATUS_CLOSED; |
281
|
|
|
|
282
|
3 |
|
if ($frame->getReplyCode()) { |
283
|
2 |
|
throw AMQPException::make($frame->getReplyText(), $frame->getReplyCode()); |
284
|
|
|
} |
285
|
1 |
|
} |
286
|
|
|
|
287
|
|
|
/** |
288
|
|
|
* This frame is received once server decide to suspend connection, for example because server |
289
|
|
|
* run out of memory and can not provide service for the connection. When this happen consumer |
290
|
|
|
* suppose to suspend all activities until connection.unblocked is received. |
291
|
|
|
* |
292
|
|
|
* @param ConnectionBlocked $frame |
293
|
|
|
*/ |
294
|
1 |
|
private function onConnectionBlocked(ConnectionBlocked $frame) |
|
|
|
|
295
|
|
|
{ |
296
|
1 |
|
$this->status = self::STATUS_BLOCKED; |
297
|
1 |
|
} |
298
|
|
|
|
299
|
|
|
/** |
300
|
|
|
* This frame is received once connection returns back to normal state after being suspended. |
301
|
|
|
* See onConnectionBlocked above. |
302
|
|
|
* |
303
|
|
|
* @param ConnectionUnblocked $frame |
304
|
|
|
*/ |
305
|
1 |
|
private function onConnectionUnblocked(ConnectionUnblocked $frame) |
|
|
|
|
306
|
|
|
{ |
307
|
1 |
|
$this->status = self::STATUS_READY; |
308
|
1 |
|
} |
309
|
|
|
} |
310
|
|
|
|
This error could be the result of:
1. Missing dependencies
PHP Analyzer uses your
composer.json
file (if available) to determine the dependencies of your project and to determine all the available classes and functions. It expects thecomposer.json
to be in the root folder of your repository.Are you sure this class is defined by one of your dependencies, or did you maybe not list a dependency in either the
require
orrequire-dev
section?2. Missing use statement
PHP does not complain about undefined classes in
ìnstanceof
checks. For example, the following PHP code will work perfectly fine:If you have not tested against this specific condition, such errors might go unnoticed.