1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace ButterAMQP; |
4
|
|
|
|
5
|
|
|
use ButterAMQP\Exception\AMQPException; |
6
|
|
|
use ButterAMQP\Exception\NoReturnException; |
7
|
|
|
use ButterAMQP\Exception\TransactionNotSelectedException; |
8
|
|
|
use ButterAMQP\Exception\UnknownConsumerTagException; |
9
|
|
|
use ButterAMQP\Framing\Content; |
10
|
|
|
use ButterAMQP\Framing\Frame; |
11
|
|
|
use ButterAMQP\Framing\Header; |
12
|
|
|
use ButterAMQP\Framing\Method\BasicAck; |
13
|
|
|
use ButterAMQP\Framing\Method\BasicCancel; |
14
|
|
|
use ButterAMQP\Framing\Method\BasicCancelOk; |
15
|
|
|
use ButterAMQP\Framing\Method\BasicConsume; |
16
|
|
|
use ButterAMQP\Framing\Method\BasicConsumeOk; |
17
|
|
|
use ButterAMQP\Framing\Method\BasicDeliver; |
18
|
|
|
use ButterAMQP\Framing\Method\BasicGet; |
19
|
|
|
use ButterAMQP\Framing\Method\BasicGetEmpty; |
20
|
|
|
use ButterAMQP\Framing\Method\BasicGetOk; |
21
|
|
|
use ButterAMQP\Framing\Method\BasicNack; |
22
|
|
|
use ButterAMQP\Framing\Method\BasicPublish; |
23
|
|
|
use ButterAMQP\Framing\Method\BasicQos; |
24
|
|
|
use ButterAMQP\Framing\Method\BasicQosOk; |
25
|
|
|
use ButterAMQP\Framing\Method\BasicRecover; |
26
|
|
|
use ButterAMQP\Framing\Method\BasicRecoverOk; |
27
|
|
|
use ButterAMQP\Framing\Method\BasicReject; |
28
|
|
|
use ButterAMQP\Framing\Method\BasicReturn; |
29
|
|
|
use ButterAMQP\Framing\Method\ChannelClose; |
30
|
|
|
use ButterAMQP\Framing\Method\ChannelCloseOk; |
31
|
|
|
use ButterAMQP\Framing\Method\ChannelFlow; |
32
|
|
|
use ButterAMQP\Framing\Method\ChannelFlowOk; |
33
|
|
|
use ButterAMQP\Framing\Method\ChannelOpen; |
34
|
|
|
use ButterAMQP\Framing\Method\ChannelOpenOk; |
35
|
|
|
use ButterAMQP\Framing\Method\ConfirmSelect; |
36
|
|
|
use ButterAMQP\Framing\Method\ConfirmSelectOk; |
37
|
|
|
use ButterAMQP\Framing\Method\TxCommit; |
38
|
|
|
use ButterAMQP\Framing\Method\TxCommitOk; |
39
|
|
|
use ButterAMQP\Framing\Method\TxRollback; |
40
|
|
|
use ButterAMQP\Framing\Method\TxRollbackOk; |
41
|
|
|
use ButterAMQP\Framing\Method\TxSelect; |
42
|
|
|
use ButterAMQP\Framing\Method\TxSelectOk; |
43
|
|
|
use Psr\Log\LoggerAwareInterface; |
44
|
|
|
use Psr\Log\LoggerAwareTrait; |
45
|
|
|
use Psr\Log\NullLogger; |
46
|
|
|
|
47
|
|
|
class Channel implements ChannelInterface, WireSubscriberInterface, LoggerAwareInterface |
48
|
|
|
{ |
49
|
|
|
use LoggerAwareTrait; |
50
|
|
|
|
51
|
|
|
const STATUS_CLOSED = 0; |
52
|
|
|
const STATUS_READY = 1; |
53
|
|
|
const STATUS_INACTIVE = 2; |
54
|
|
|
|
55
|
|
|
const MODE_NORMAL = 0; |
56
|
|
|
const MODE_CONFIRM = 1; |
57
|
|
|
const MODE_TX = 2; |
58
|
|
|
|
59
|
|
|
/** |
60
|
|
|
* @var int |
61
|
|
|
*/ |
62
|
|
|
private $id; |
63
|
|
|
|
64
|
|
|
/** |
65
|
|
|
* @var WireInterface |
66
|
|
|
*/ |
67
|
|
|
private $wire; |
68
|
|
|
|
69
|
|
|
/** |
70
|
|
|
* @var int |
71
|
|
|
*/ |
72
|
|
|
private $status = self::STATUS_CLOSED; |
73
|
|
|
|
74
|
|
|
/** |
75
|
|
|
* @var int |
76
|
|
|
*/ |
77
|
|
|
private $mode = self::MODE_NORMAL; |
78
|
|
|
|
79
|
|
|
/** |
80
|
|
|
* @var callable[] |
81
|
|
|
*/ |
82
|
|
|
private $consumers = []; |
83
|
|
|
|
84
|
|
|
/** |
85
|
|
|
* @var callable |
86
|
|
|
*/ |
87
|
|
|
private $returnCallable; |
88
|
|
|
|
89
|
|
|
/** |
90
|
|
|
* @var callable |
91
|
|
|
*/ |
92
|
|
|
private $confirmCallable; |
93
|
|
|
|
94
|
|
|
/** |
95
|
|
|
* @param WireInterface $wire |
96
|
|
|
* @param int $id |
97
|
|
|
*/ |
98
|
53 |
|
public function __construct(WireInterface $wire, $id) |
99
|
|
|
{ |
100
|
53 |
|
$this->id = $id; |
101
|
53 |
|
$this->wire = $wire; |
102
|
53 |
|
$this->logger = new NullLogger(); |
103
|
53 |
|
} |
104
|
|
|
|
105
|
|
|
/** |
106
|
|
|
* {@inheritdoc} |
107
|
|
|
*/ |
108
|
19 |
|
public function open() |
109
|
|
|
{ |
110
|
19 |
|
if ($this->status != self::STATUS_CLOSED) { |
111
|
1 |
|
return $this; |
112
|
|
|
} |
113
|
|
|
|
114
|
19 |
|
$this->wire->subscribe($this->id, $this); |
115
|
|
|
|
116
|
19 |
|
$this->send(new ChannelOpen($this->id, '')) |
117
|
19 |
|
->wait(ChannelOpenOk::class); |
118
|
|
|
|
119
|
19 |
|
$this->status = self::STATUS_READY; |
120
|
19 |
|
$this->mode = self::MODE_NORMAL; |
121
|
|
|
|
122
|
19 |
|
return $this; |
123
|
|
|
} |
124
|
|
|
|
125
|
|
|
/** |
126
|
|
|
* {@inheritdoc} |
127
|
|
|
*/ |
128
|
1 |
|
public function flow($active) |
129
|
|
|
{ |
130
|
|
|
/** @var ChannelFlowOk $frame */ |
131
|
1 |
|
$frame = $this->send(new ChannelFlow($this->id, $active)) |
132
|
1 |
|
->wait(ChannelFlowOk::class); |
133
|
|
|
|
134
|
1 |
|
$this->status = $frame->isActive() ? self::STATUS_READY : |
135
|
1 |
|
self::STATUS_INACTIVE; |
136
|
|
|
|
137
|
1 |
|
return $this; |
138
|
|
|
} |
139
|
|
|
|
140
|
|
|
/** |
141
|
|
|
* {@inheritdoc} |
142
|
|
|
*/ |
143
|
1 |
|
public function close() |
144
|
|
|
{ |
145
|
1 |
|
$this->send(new ChannelClose($this->id, 0, '', 0, 0)) |
146
|
1 |
|
->wait(ChannelCloseOk::class); |
147
|
|
|
|
148
|
1 |
|
$this->status = self::STATUS_CLOSED; |
149
|
|
|
|
150
|
1 |
|
return $this; |
151
|
|
|
} |
152
|
|
|
|
153
|
|
|
/** |
154
|
|
|
* {@inheritdoc} |
155
|
|
|
*/ |
156
|
2 |
|
public function qos($prefetchSize, $prefetchCount, $globally = false) |
157
|
|
|
{ |
158
|
2 |
|
$this->send(new BasicQos($this->id, $prefetchSize, $prefetchCount, $globally)) |
159
|
1 |
|
->wait(BasicQosOk::class); |
160
|
|
|
|
161
|
1 |
|
return $this; |
162
|
|
|
} |
163
|
|
|
|
164
|
|
|
/** |
165
|
|
|
* {@inheritdoc} |
166
|
|
|
*/ |
167
|
4 |
|
public function exchange($name) |
168
|
|
|
{ |
169
|
4 |
|
return new Exchange($this->wire, $this->id, $name); |
170
|
|
|
} |
171
|
|
|
|
172
|
|
|
/** |
173
|
|
|
* {@inheritdoc} |
174
|
|
|
*/ |
175
|
14 |
|
public function queue($name = '') |
176
|
|
|
{ |
177
|
14 |
|
return new Queue($this->wire, $this->id, $name); |
178
|
|
|
} |
179
|
|
|
|
180
|
|
|
/** |
181
|
|
|
* {@inheritdoc} |
182
|
|
|
*/ |
183
|
11 |
|
public function consume($queue, callable $callback, $flags = 0, $tag = '', array $arguments = []) |
184
|
|
|
{ |
185
|
11 |
|
if (empty($tag) && $flags & Consumer::FLAG_NO_WAIT) { |
186
|
1 |
|
$tag = uniqid('php-consumer-'); |
187
|
1 |
|
} |
188
|
|
|
|
189
|
11 |
|
$this->send(new BasicConsume( |
190
|
11 |
|
$this->id, |
191
|
11 |
|
0, |
192
|
11 |
|
$queue, |
193
|
11 |
|
$tag, |
194
|
11 |
|
$flags & Consumer::FLAG_NO_LOCAL, |
195
|
11 |
|
$flags & Consumer::FLAG_NO_ACK, |
196
|
11 |
|
$flags & Consumer::FLAG_EXCLUSIVE, |
197
|
11 |
|
$flags & Consumer::FLAG_NO_WAIT, |
198
|
|
|
$arguments |
199
|
11 |
|
)); |
200
|
|
|
|
201
|
11 |
|
if (!($flags & Consumer::FLAG_NO_WAIT)) { |
202
|
7 |
|
$tag = $this->wait(BasicConsumeOk::class) |
203
|
7 |
|
->getConsumerTag(); |
204
|
7 |
|
} |
205
|
|
|
|
206
|
11 |
|
$this->consumers[$tag] = $callback; |
207
|
|
|
|
208
|
11 |
|
return new Consumer($this, $tag); |
209
|
|
|
} |
210
|
|
|
|
211
|
|
|
/** |
212
|
|
|
* {@inheritdoc} |
213
|
|
|
*/ |
214
|
7 |
|
public function get($queue, $withAck = true) |
215
|
|
|
{ |
216
|
|
|
/** @var BasicGetOk|BasicGetEmpty $frame */ |
217
|
7 |
|
$frame = $this->send(new BasicGet($this->id, 0, $queue, !$withAck)) |
218
|
7 |
|
->wait([BasicGetOk::class, BasicGetEmpty::class]); |
219
|
|
|
|
220
|
7 |
|
if ($frame instanceof BasicGetEmpty) { |
|
|
|
|
221
|
2 |
|
return null; |
222
|
|
|
} |
223
|
|
|
|
224
|
|
|
/** @var Header $header */ |
225
|
5 |
|
$header = $this->wait(Header::class); |
226
|
5 |
|
$content = ''; |
227
|
|
|
|
228
|
5 |
|
while ($header->getSize() > strlen($content)) { |
229
|
3 |
|
$content .= $this->wait(Content::class)->getData(); |
230
|
3 |
|
} |
231
|
|
|
|
232
|
5 |
|
return new Delivery( |
233
|
5 |
|
$this, |
234
|
5 |
|
'', |
235
|
5 |
|
$frame->getDeliveryTag(), |
236
|
5 |
|
$frame->isRedelivered(), |
237
|
5 |
|
$frame->getExchange(), |
238
|
5 |
|
$frame->getRoutingKey(), |
239
|
5 |
|
$content, |
240
|
5 |
|
$header->getProperties() |
241
|
5 |
|
); |
242
|
|
|
} |
243
|
|
|
|
244
|
|
|
/** |
245
|
|
|
* {@inheritdoc} |
246
|
|
|
*/ |
247
|
2 |
|
public function recover($requeue = true) |
248
|
|
|
{ |
249
|
2 |
|
$this->send(new BasicRecover($this->id, $requeue)) |
250
|
2 |
|
->wait(BasicRecoverOk::class); |
251
|
|
|
|
252
|
2 |
|
return $this; |
253
|
|
|
} |
254
|
|
|
|
255
|
|
|
/** |
256
|
|
|
* {@inheritdoc} |
257
|
|
|
*/ |
258
|
6 |
|
public function cancel($tag, $flags = 0) |
259
|
|
|
{ |
260
|
6 |
|
$this->send(new BasicCancel($this->id, $tag, $flags & Consumer::FLAG_NO_WAIT)); |
261
|
|
|
|
262
|
6 |
|
unset($this->consumers[$tag]); |
263
|
|
|
|
264
|
6 |
|
if ($flags & Consumer::FLAG_NO_WAIT) { |
265
|
1 |
|
return $this; |
266
|
|
|
} |
267
|
|
|
|
268
|
5 |
|
$this->wait(BasicCancelOk::class); |
269
|
|
|
|
270
|
5 |
|
return $this; |
271
|
|
|
} |
272
|
|
|
|
273
|
|
|
/** |
274
|
|
|
* {@inheritdoc} |
275
|
|
|
*/ |
276
|
15 |
|
public function publish(Message $message, $exchange = '', $routingKey = '', $flags = 0) |
277
|
|
|
{ |
278
|
15 |
|
$this->send(new BasicPublish( |
279
|
15 |
|
$this->id, |
280
|
15 |
|
0, |
281
|
15 |
|
$exchange, |
282
|
15 |
|
$routingKey, |
283
|
15 |
|
(bool) ($flags & Message::FLAG_MANDATORY), |
284
|
15 |
|
(bool) ($flags & Message::FLAG_IMMEDIATE) |
285
|
15 |
|
)); |
286
|
|
|
|
287
|
15 |
|
$body = $message->getBody(); |
288
|
|
|
|
289
|
15 |
|
$this->send(new Header($this->id, 60, 0, strlen($body), $message->getProperties())); |
290
|
15 |
|
$this->send(new Content($this->id, $body)); |
291
|
|
|
|
292
|
15 |
|
return $this; |
293
|
|
|
} |
294
|
|
|
|
295
|
|
|
/** |
296
|
|
|
* {@inheritdoc} |
297
|
|
|
*/ |
298
|
6 |
|
public function ack($deliveryTag, $multiple = false) |
299
|
|
|
{ |
300
|
6 |
|
$this->send(new BasicAck($this->id, $deliveryTag, $multiple)); |
301
|
|
|
|
302
|
6 |
|
return $this; |
303
|
|
|
} |
304
|
|
|
|
305
|
|
|
/** |
306
|
|
|
* {@inheritdoc} |
307
|
|
|
*/ |
308
|
4 |
|
public function reject($deliveryTag, $requeue = true, $multiple = false) |
309
|
|
|
{ |
310
|
4 |
|
$multiple ? $this->send(new BasicNack($this->id, $deliveryTag, $multiple, $requeue)) : |
311
|
3 |
|
$this->send(new BasicReject($this->id, $deliveryTag, $requeue)); |
312
|
|
|
|
313
|
4 |
|
return $this; |
314
|
|
|
} |
315
|
|
|
|
316
|
|
|
/** |
317
|
|
|
* {@inheritdoc} |
318
|
|
|
*/ |
319
|
2 |
|
public function onReturn(callable $callable) |
320
|
|
|
{ |
321
|
2 |
|
$this->returnCallable = $callable; |
322
|
|
|
|
323
|
2 |
|
return $this; |
324
|
|
|
} |
325
|
|
|
|
326
|
|
|
/** |
327
|
|
|
* {@inheritdoc} |
328
|
|
|
*/ |
329
|
5 |
|
public function selectConfirm(callable $callable, $noWait = false) |
330
|
|
|
{ |
331
|
5 |
|
$this->confirmCallable = $callable; |
332
|
|
|
|
333
|
5 |
|
$this->send(new ConfirmSelect($this->id, $noWait)); |
334
|
|
|
|
335
|
5 |
|
if (!$noWait) { |
336
|
4 |
|
$this->wait(ConfirmSelectOk::class); |
337
|
4 |
|
} |
338
|
|
|
|
339
|
5 |
|
$this->mode = self::MODE_CONFIRM; |
340
|
|
|
|
341
|
5 |
|
return $this; |
342
|
|
|
} |
343
|
|
|
|
344
|
|
|
/** |
345
|
|
|
* {@inheritdoc} |
346
|
|
|
*/ |
347
|
7 |
|
public function selectTx() |
348
|
|
|
{ |
349
|
7 |
|
$this->send(new TxSelect($this->id)) |
350
|
7 |
|
->wait(TxSelectOk::class); |
351
|
|
|
|
352
|
7 |
|
$this->mode = self::MODE_TX; |
353
|
|
|
|
354
|
7 |
|
return $this; |
355
|
|
|
} |
356
|
|
|
|
357
|
|
|
/** |
358
|
|
|
* {@inheritdoc} |
359
|
|
|
*/ |
360
|
5 |
|
public function txCommit() |
361
|
|
|
{ |
362
|
5 |
|
if ($this->mode != self::MODE_TX) { |
363
|
|
|
throw new TransactionNotSelectedException('Channel is not in transaction mode. Use Channel::selectTx() to select transaction mode on this channel.'); |
364
|
|
|
} |
365
|
|
|
|
366
|
5 |
|
$this->send(new TxCommit($this->id)) |
367
|
5 |
|
->wait(TxCommitOk::class); |
368
|
|
|
|
369
|
5 |
|
return; |
370
|
|
|
} |
371
|
|
|
|
372
|
|
|
/** |
373
|
|
|
* {@inheritdoc} |
374
|
|
|
*/ |
375
|
3 |
|
public function txRollback() |
376
|
|
|
{ |
377
|
3 |
|
if ($this->mode != self::MODE_TX) { |
378
|
|
|
throw new TransactionNotSelectedException('Channel is not in transaction mode. Use Channel::selectTx() to select transaction mode on this channel.'); |
379
|
|
|
} |
380
|
|
|
|
381
|
3 |
|
$this->send(new TxRollback($this->id)) |
382
|
3 |
|
->wait(TxRollbackOk::class); |
383
|
|
|
|
384
|
3 |
|
return; |
385
|
|
|
} |
386
|
|
|
|
387
|
|
|
/** |
388
|
|
|
* {@inheritdoc} |
389
|
|
|
*/ |
390
|
10 |
|
public function hasConsumer($tag) |
391
|
|
|
{ |
392
|
10 |
|
return isset($this->consumers[(string) $tag]); |
393
|
|
|
} |
394
|
|
|
|
395
|
|
|
/** |
396
|
|
|
* {@inheritdoc} |
397
|
|
|
*/ |
398
|
3 |
|
public function getConsumerTags() |
399
|
|
|
{ |
400
|
3 |
|
return array_keys($this->consumers); |
401
|
|
|
} |
402
|
|
|
|
403
|
|
|
/** |
404
|
|
|
* @return string |
405
|
|
|
*/ |
406
|
3 |
|
public function getStatus() |
407
|
|
|
{ |
408
|
3 |
|
return $this->status; |
409
|
|
|
} |
410
|
|
|
|
411
|
|
|
/** |
412
|
|
|
* @return int |
413
|
|
|
*/ |
414
|
|
|
public function getMode() |
415
|
|
|
{ |
416
|
|
|
return $this->mode; |
417
|
|
|
} |
418
|
|
|
|
419
|
|
|
/** |
420
|
|
|
* Sends frame to the server. |
421
|
|
|
* |
422
|
|
|
* @param Frame $frame |
423
|
|
|
* |
424
|
|
|
* @return $this |
425
|
|
|
*/ |
426
|
46 |
|
private function send(Frame $frame) |
427
|
|
|
{ |
428
|
46 |
|
$this->wire->send($frame); |
429
|
|
|
|
430
|
46 |
|
return $this; |
431
|
|
|
} |
432
|
|
|
|
433
|
|
|
/** |
434
|
|
|
* @param string|array $type |
435
|
|
|
* |
436
|
|
|
* @return Frame |
437
|
|
|
*/ |
438
|
37 |
|
private function wait($type) |
439
|
|
|
{ |
440
|
37 |
|
return $this->wire->wait($this->id, $type); |
441
|
|
|
} |
442
|
|
|
|
443
|
|
|
/** |
444
|
|
|
* @param Frame $frame |
445
|
|
|
*/ |
446
|
29 |
|
public function dispatch(Frame $frame) |
447
|
|
|
{ |
448
|
29 |
|
if ($frame instanceof ChannelClose) { |
|
|
|
|
449
|
3 |
|
$this->onChannelClose($frame); |
450
|
|
|
} |
451
|
|
|
|
452
|
28 |
|
if ($frame instanceof ChannelFlow) { |
|
|
|
|
453
|
1 |
|
$this->onChannelFlow($frame); |
454
|
1 |
|
} |
455
|
|
|
|
456
|
28 |
|
if ($frame instanceof BasicDeliver) { |
|
|
|
|
457
|
6 |
|
$this->onBasicDeliver($frame); |
458
|
5 |
|
} |
459
|
|
|
|
460
|
27 |
|
if ($frame instanceof BasicReturn) { |
|
|
|
|
461
|
4 |
|
$this->onBasicReturn($frame); |
462
|
2 |
|
} |
463
|
|
|
|
464
|
26 |
|
if ($frame instanceof BasicAck) { |
|
|
|
|
465
|
3 |
|
$this->onBasicAck($frame); |
466
|
2 |
|
} |
467
|
|
|
|
468
|
25 |
|
if ($frame instanceof BasicNack) { |
|
|
|
|
469
|
2 |
|
$this->onBasicNack($frame); |
470
|
1 |
|
} |
471
|
|
|
|
472
|
24 |
|
if ($frame instanceof BasicCancel) { |
|
|
|
|
473
|
3 |
|
$this->onBasicCancel($frame); |
474
|
3 |
|
} |
475
|
24 |
|
} |
476
|
|
|
|
477
|
|
|
/** |
478
|
|
|
* @param BasicDeliver $frame |
479
|
|
|
* |
480
|
|
|
* @throws \Exception |
481
|
|
|
*/ |
482
|
6 |
|
private function onBasicDeliver(BasicDeliver $frame) |
483
|
|
|
{ |
484
|
|
|
/** @var Header $header */ |
485
|
6 |
|
$header = $this->wait(Header::class); |
486
|
6 |
|
$content = ''; |
487
|
|
|
|
488
|
6 |
|
while ($header->getSize() > strlen($content)) { |
489
|
6 |
|
$content .= $this->wait(Content::class)->getData(); |
490
|
6 |
|
} |
491
|
|
|
|
492
|
6 |
|
if (!isset($this->consumers[$frame->getConsumerTag()])) { |
493
|
1 |
|
throw new UnknownConsumerTagException(sprintf( |
494
|
1 |
|
'Consumer with tag "%s" does not exist', |
495
|
1 |
|
$frame->getConsumerTag() |
496
|
1 |
|
)); |
497
|
|
|
} |
498
|
|
|
|
499
|
5 |
|
$delivery = new Delivery( |
500
|
5 |
|
$this, |
501
|
5 |
|
$frame->getConsumerTag(), |
502
|
5 |
|
$frame->getDeliveryTag(), |
503
|
5 |
|
$frame->isRedelivered(), |
504
|
5 |
|
$frame->getExchange(), |
505
|
5 |
|
$frame->getRoutingKey(), |
506
|
5 |
|
$content, |
507
|
5 |
|
$header->getProperties() |
508
|
5 |
|
); |
509
|
|
|
|
510
|
5 |
|
call_user_func($this->consumers[$frame->getConsumerTag()], $delivery); |
511
|
5 |
|
} |
512
|
|
|
|
513
|
|
|
/** |
514
|
|
|
* @param BasicReturn $frame |
515
|
|
|
* |
516
|
|
|
* @throws \Exception |
517
|
|
|
*/ |
518
|
4 |
|
private function onBasicReturn(BasicReturn $frame) |
519
|
|
|
{ |
520
|
|
|
/** @var Header $header */ |
521
|
4 |
|
$header = $this->wait(Header::class); |
522
|
4 |
|
$content = ''; |
523
|
|
|
|
524
|
4 |
|
while ($header->getSize() > strlen($content)) { |
525
|
3 |
|
$content .= $this->wait(Content::class)->getData(); |
526
|
3 |
|
} |
527
|
|
|
|
528
|
4 |
|
if (!$this->returnCallable) { |
529
|
2 |
|
throw new NoReturnException( |
530
|
|
|
'A message was returned but there is no return handler. '. |
531
|
2 |
|
'Make sure you setup a handler for returned messages using Channel::onReturn method, '. |
532
|
|
|
', or remove MANDATORY and IMMEDIATE flags when publishing messages.' |
533
|
2 |
|
); |
534
|
|
|
} |
535
|
|
|
|
536
|
2 |
|
$returned = new Returned( |
537
|
2 |
|
$frame->getReplyCode(), |
538
|
2 |
|
$frame->getReplyText(), |
539
|
2 |
|
$frame->getExchange(), |
540
|
2 |
|
$frame->getRoutingKey(), |
541
|
2 |
|
$content, |
542
|
2 |
|
$header->getProperties() |
543
|
2 |
|
); |
544
|
|
|
|
545
|
2 |
|
call_user_func($this->returnCallable, $returned); |
546
|
2 |
|
} |
547
|
|
|
|
548
|
|
|
/** |
549
|
|
|
* @param BasicAck $frame |
550
|
|
|
*/ |
551
|
3 |
View Code Duplication |
private function onBasicAck(BasicAck $frame) |
|
|
|
|
552
|
|
|
{ |
553
|
3 |
|
if (!$this->confirmCallable) { |
554
|
1 |
|
throw new \RuntimeException( |
555
|
|
|
'Something is wrong: channel is in confirm mode, but confirm callable is not set' |
556
|
1 |
|
); |
557
|
|
|
} |
558
|
|
|
|
559
|
2 |
|
call_user_func($this->confirmCallable, new Confirm(true, $frame->getDeliveryTag(), $frame->isMultiple())); |
560
|
2 |
|
} |
561
|
|
|
|
562
|
|
|
/** |
563
|
|
|
* @param BasicNack $frame |
564
|
|
|
*/ |
565
|
2 |
View Code Duplication |
private function onBasicNack(BasicNack $frame) |
|
|
|
|
566
|
|
|
{ |
567
|
2 |
|
if (!$this->confirmCallable) { |
568
|
1 |
|
throw new \RuntimeException( |
569
|
|
|
'Something is wrong: channel is in confirm mode, but confirm callable is not set' |
570
|
1 |
|
); |
571
|
|
|
} |
572
|
|
|
|
573
|
1 |
|
call_user_func($this->confirmCallable, new Confirm(false, $frame->getDeliveryTag(), $frame->isMultiple())); |
574
|
1 |
|
} |
575
|
|
|
|
576
|
|
|
/** |
577
|
|
|
* @param BasicCancel $frame |
578
|
|
|
*/ |
579
|
3 |
|
private function onBasicCancel(BasicCancel $frame) |
580
|
|
|
{ |
581
|
3 |
|
unset($this->consumers[$frame->getConsumerTag()]); |
582
|
|
|
|
583
|
3 |
|
if (!$frame->isNoWait()) { |
584
|
1 |
|
$this->send(new BasicCancelOk($this->id, $frame->getConsumerTag())); |
585
|
1 |
|
} |
586
|
3 |
|
} |
587
|
|
|
|
588
|
|
|
/** |
589
|
|
|
* @param ChannelFlow $frame |
590
|
|
|
*/ |
591
|
1 |
|
private function onChannelFlow(ChannelFlow $frame) |
592
|
|
|
{ |
593
|
1 |
|
$this->send(new ChannelFlowOk($this->id, $frame->isActive())); |
594
|
|
|
|
595
|
1 |
|
$this->status = $frame->isActive() ? self::STATUS_READY : self::STATUS_INACTIVE; |
596
|
1 |
|
} |
597
|
|
|
|
598
|
|
|
/** |
599
|
|
|
* @param ChannelClose $frame |
600
|
|
|
* |
601
|
|
|
* @throws AMQPException |
602
|
|
|
*/ |
603
|
3 |
|
private function onChannelClose(ChannelClose $frame) |
604
|
|
|
{ |
605
|
3 |
|
$this->send(new ChannelCloseOk($this->id)); |
606
|
|
|
|
607
|
3 |
|
$this->status = self::STATUS_CLOSED; |
608
|
|
|
|
609
|
3 |
|
throw AMQPException::make($frame->getReplyText(), $frame->getReplyCode()); |
610
|
|
|
} |
611
|
|
|
} |
612
|
|
|
|
This error could be the result of:
1. Missing dependencies
PHP Analyzer uses your
composer.json
file (if available) to determine the dependencies of your project and to determine all the available classes and functions. It expects thecomposer.json
to be in the root folder of your repository.Are you sure this class is defined by one of your dependencies, or did you maybe not list a dependency in either the
require
orrequire-dev
section?2. Missing use statement
PHP does not complain about undefined classes in
ìnstanceof
checks. For example, the following PHP code will work perfectly fine:If you have not tested against this specific condition, such errors might go unnoticed.