Completed
Push — master ( 2528a8...9eba40 )
by Sergey
03:58
created

Channel::readHeader()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 4
ccs 2
cts 2
cp 1
rs 10
c 0
b 0
f 0
cc 1
eloc 2
nc 1
nop 0
crap 1
1
<?php
2
3
namespace ButterAMQP\AMQP091;
4
5
use ButterAMQP\ChannelInterface;
6
use ButterAMQP\Confirm;
7
use ButterAMQP\Delivery;
8
use ButterAMQP\Exception\AMQPException;
9
use ButterAMQP\Exception\NoReturnException;
10
use ButterAMQP\Exception\TransactionNotSelectedException;
11
use ButterAMQP\Exception\UnknownConsumerTagException;
12
use ButterAMQP\AMQP091\Framing\Content;
13
use ButterAMQP\AMQP091\Framing\Frame;
14
use ButterAMQP\AMQP091\Framing\Header;
15
use ButterAMQP\AMQP091\Framing\Method\BasicAck;
16
use ButterAMQP\AMQP091\Framing\Method\BasicCancel;
17
use ButterAMQP\AMQP091\Framing\Method\BasicCancelOk;
18
use ButterAMQP\AMQP091\Framing\Method\BasicConsume;
19
use ButterAMQP\AMQP091\Framing\Method\BasicConsumeOk;
20
use ButterAMQP\AMQP091\Framing\Method\BasicDeliver;
21
use ButterAMQP\AMQP091\Framing\Method\BasicGet;
22
use ButterAMQP\AMQP091\Framing\Method\BasicGetEmpty;
23
use ButterAMQP\AMQP091\Framing\Method\BasicGetOk;
24
use ButterAMQP\AMQP091\Framing\Method\BasicNack;
25
use ButterAMQP\AMQP091\Framing\Method\BasicPublish;
26
use ButterAMQP\AMQP091\Framing\Method\BasicQos;
27
use ButterAMQP\AMQP091\Framing\Method\BasicQosOk;
28
use ButterAMQP\AMQP091\Framing\Method\BasicRecover;
29
use ButterAMQP\AMQP091\Framing\Method\BasicRecoverOk;
30
use ButterAMQP\AMQP091\Framing\Method\BasicReject;
31
use ButterAMQP\AMQP091\Framing\Method\BasicReturn;
32
use ButterAMQP\AMQP091\Framing\Method\ChannelClose;
33
use ButterAMQP\AMQP091\Framing\Method\ChannelCloseOk;
34
use ButterAMQP\AMQP091\Framing\Method\ChannelFlow;
35
use ButterAMQP\AMQP091\Framing\Method\ChannelFlowOk;
36
use ButterAMQP\AMQP091\Framing\Method\ChannelOpen;
37
use ButterAMQP\AMQP091\Framing\Method\ChannelOpenOk;
38
use ButterAMQP\AMQP091\Framing\Method\ConfirmSelect;
39
use ButterAMQP\AMQP091\Framing\Method\ConfirmSelectOk;
40
use ButterAMQP\AMQP091\Framing\Method\TxCommit;
41
use ButterAMQP\AMQP091\Framing\Method\TxCommitOk;
42
use ButterAMQP\AMQP091\Framing\Method\TxRollback;
43
use ButterAMQP\AMQP091\Framing\Method\TxRollbackOk;
44
use ButterAMQP\AMQP091\Framing\Method\TxSelect;
45
use ButterAMQP\AMQP091\Framing\Method\TxSelectOk;
46
use ButterAMQP\Message;
47
use ButterAMQP\Returned;
48
49
class Channel implements ChannelInterface, WireSubscriberInterface
50
{
51
    const STATUS_CLOSED = 0;
52
    const STATUS_READY = 1;
53
    const STATUS_INACTIVE = 2;
54
55
    const MODE_NORMAL = 0;
56
    const MODE_CONFIRM = 1;
57
    const MODE_TX = 2;
58
59
    /**
60
     * @var int
61
     */
62
    private $id;
63
64
    /**
65
     * @var WireInterface
66
     */
67
    private $wire;
68
69
    /**
70
     * @var int
71
     */
72
    private $status = self::STATUS_CLOSED;
73
74
    /**
75
     * @var int
76
     */
77
    private $mode = self::MODE_NORMAL;
78
79
    /**
80
     * @var callable[]
81
     */
82
    private $consumers = [];
83
84
    /**
85
     * @var callable
86
     */
87
    private $returnCallable;
88
89
    /**
90
     * @var callable
91
     */
92
    private $confirmCallable;
93
94
    /**
95
     * @param WireInterface $wire
96
     * @param int           $id
97
     */
98 56
    public function __construct(WireInterface $wire, $id)
99
    {
100 56
        $this->id = $id;
101 56
        $this->wire = $wire;
102 56
    }
103
104
    /**
105
     * {@inheritdoc}
106
     */
107 19
    public function open()
108
    {
109 19
        if ($this->status != self::STATUS_CLOSED) {
110 1
            return $this;
111
        }
112
113 19
        $this->wire->subscribe($this->id, $this);
114
115 19
        $this->send(new ChannelOpen($this->id, ''))
116 19
            ->wait(ChannelOpenOk::class);
117
118 19
        $this->status = self::STATUS_READY;
119 19
        $this->mode = self::MODE_NORMAL;
120
121 19
        return $this;
122
    }
123
124
    /**
125
     * {@inheritdoc}
126
     */
127 1
    public function flow($active)
128
    {
129
        /** @var ChannelFlowOk $frame */
130 1
        $frame = $this->send(new ChannelFlow($this->id, $active))
131 1
            ->wait(ChannelFlowOk::class);
132
133 1
        $this->status = $frame->isActive() ? self::STATUS_READY :
134 1
            self::STATUS_INACTIVE;
135
136 1
        return $this;
137
    }
138
139
    /**
140
     * {@inheritdoc}
141
     */
142 1
    public function serve($blocking = true)
143
    {
144 1
        $this->wire->next($blocking);
145
146 1
        return $this;
147
    }
148
149
    /**
150
     * {@inheritdoc}
151
     */
152 1
    public function close()
153
    {
154 1
        $this->send(new ChannelClose($this->id, 0, '', 0, 0))
155 1
            ->wait(ChannelCloseOk::class);
156
157 1
        $this->status = self::STATUS_CLOSED;
158
159 1
        return $this;
160
    }
161
162
    /**
163
     * {@inheritdoc}
164
     */
165 1
    public function qos($prefetchSize, $prefetchCount, $globally = false)
166
    {
167 1
        $this->send(new BasicQos($this->id, $prefetchSize, $prefetchCount, $globally))
168 1
            ->wait(BasicQosOk::class);
169
170 1
        return $this;
171
    }
172
173
    /**
174
     * {@inheritdoc}
175
     */
176 4
    public function exchange($name)
177
    {
178 4
        return new Exchange($this->wire, $this->id, $name);
179
    }
180
181
    /**
182
     * {@inheritdoc}
183
     */
184 14
    public function queue($name = '')
185
    {
186 14
        return new Queue($this->wire, $this->id, $name);
187
    }
188
189
    /**
190
     * {@inheritdoc}
191
     */
192 11
    public function consume($queue, callable $callback, $flags = 0, $tag = '', array $arguments = [])
193
    {
194 11
        if (empty($tag) && $flags & Consumer::FLAG_NO_WAIT) {
195 1
            $tag = uniqid('php-consumer-');
196 1
        }
197
198 11
        $this->send(new BasicConsume(
199 11
            $this->id,
200 11
            0,
201 11
            $queue,
202 11
            $tag,
203 11
            $flags & Consumer::FLAG_NO_LOCAL,
204 11
            $flags & Consumer::FLAG_NO_ACK,
205 11
            $flags & Consumer::FLAG_EXCLUSIVE,
206 11
            $flags & Consumer::FLAG_NO_WAIT,
207
            $arguments
208 11
        ));
209
210 11
        if (!($flags & Consumer::FLAG_NO_WAIT)) {
211 7
            $tag = $this->wait(BasicConsumeOk::class)
212 7
                ->getConsumerTag();
213 7
        }
214
215 11
        $this->consumers[$tag] = $callback;
216
217 11
        return new Consumer($this, $tag);
218
    }
219
220
    /**
221
     * {@inheritdoc}
222
     */
223 7
    public function get($queue, $withAck = true)
224
    {
225
        /** @var BasicGetOk|BasicGetEmpty $frame */
226 7
        $frame = $this->send(new BasicGet($this->id, 0, $queue, !$withAck))
227 7
            ->wait([BasicGetOk::class, BasicGetEmpty::class]);
228
229 7
        if ($frame instanceof BasicGetEmpty) {
230 2
            return null;
231
        }
232
233 5
        $header = $this->readHeader();
234 5
        $content = $this->readContent($header->getSize());
235
236 5
        return new Delivery(
237 5
            $this,
238 5
            '',
239 5
            $frame->getDeliveryTag(),
240 5
            $frame->isRedelivered(),
241 5
            $frame->getExchange(),
242 5
            $frame->getRoutingKey(),
243 5
            $content,
244 5
            $header->getProperties()
245 5
        );
246
    }
247
248
    /**
249
     * {@inheritdoc}
250
     */
251 2
    public function recover($requeue = true)
252
    {
253 2
        $this->send(new BasicRecover($this->id, $requeue))
254 2
            ->wait(BasicRecoverOk::class);
255
256 2
        return $this;
257
    }
258
259
    /**
260
     * {@inheritdoc}
261
     */
262 6
    public function cancel($tag, $flags = 0)
263
    {
264 6
        $this->send(new BasicCancel($this->id, $tag, $flags & Consumer::FLAG_NO_WAIT));
265
266 6
        unset($this->consumers[$tag]);
267
268 6
        if ($flags & Consumer::FLAG_NO_WAIT) {
269 1
            return $this;
270
        }
271
272 5
        $this->wait(BasicCancelOk::class);
273
274 5
        return $this;
275
    }
276
277
    /**
278
     * {@inheritdoc}
279
     */
280 15
    public function publish(Message $message, $exchange = '', $routingKey = '', $flags = 0)
281
    {
282 15
        $this->send(new BasicPublish(
283 15
            $this->id,
284 15
            0,
285 15
            $exchange,
286 15
            $routingKey,
287 15
            (bool) ($flags & Message::FLAG_MANDATORY),
288 15
            (bool) ($flags & Message::FLAG_IMMEDIATE)
289 15
        ));
290
291 15
        $body = $message->getBody();
292
293 15
        $this->send(new Header($this->id, 60, 0, strlen($body), $message->getProperties()));
294 15
        $this->send(new Content($this->id, $body));
295
296 15
        return $this;
297
    }
298
299
    /**
300
     * {@inheritdoc}
301
     */
302 6
    public function ack($deliveryTag, $multiple = false)
303
    {
304 6
        $this->send(new BasicAck($this->id, $deliveryTag, $multiple));
305
306 6
        return $this;
307
    }
308
309
    /**
310
     * {@inheritdoc}
311
     */
312 4
    public function reject($deliveryTag, $requeue = true, $multiple = false)
313
    {
314 4
        $multiple ? $this->send(new BasicNack($this->id, $deliveryTag, $multiple, $requeue)) :
315 3
            $this->send(new BasicReject($this->id, $deliveryTag, $requeue));
316
317 4
        return $this;
318
    }
319
320
    /**
321
     * {@inheritdoc}
322
     */
323 2
    public function onReturn(callable $callable)
324
    {
325 2
        $this->returnCallable = $callable;
326
327 2
        return $this;
328
    }
329
330
    /**
331
     * {@inheritdoc}
332
     */
333 5
    public function selectConfirm(callable $callable, $noWait = false)
334
    {
335 5
        $this->confirmCallable = $callable;
336
337 5
        $this->send(new ConfirmSelect($this->id, $noWait));
338
339 5
        if (!$noWait) {
340 4
            $this->wait(ConfirmSelectOk::class);
341 4
        }
342
343 5
        $this->mode = self::MODE_CONFIRM;
344
345 5
        return $this;
346
    }
347
348
    /**
349
     * {@inheritdoc}
350
     */
351 7
    public function selectTx()
352
    {
353 7
        $this->send(new TxSelect($this->id))
354 7
            ->wait(TxSelectOk::class);
355
356 7
        $this->mode = self::MODE_TX;
357
358 7
        return $this;
359
    }
360
361
    /**
362
     * {@inheritdoc}
363
     */
364 6
    public function txCommit()
365
    {
366 6
        if ($this->mode != self::MODE_TX) {
367 1
            throw new TransactionNotSelectedException('Channel is not in transaction mode. Use Channel::selectTx() to select transaction mode on this channel.');
368
        }
369
370 5
        $this->send(new TxCommit($this->id))
371 5
            ->wait(TxCommitOk::class);
372
373 5
        return;
374
    }
375
376
    /**
377
     * {@inheritdoc}
378
     */
379 4
    public function txRollback()
380
    {
381 4
        if ($this->mode != self::MODE_TX) {
382 1
            throw new TransactionNotSelectedException('Channel is not in transaction mode. Use Channel::selectTx() to select transaction mode on this channel.');
383
        }
384
385 3
        $this->send(new TxRollback($this->id))
386 3
            ->wait(TxRollbackOk::class);
387
388 3
        return;
389
    }
390
391
    /**
392
     * {@inheritdoc}
393
     */
394 10
    public function hasConsumer($tag)
395
    {
396 10
        return isset($this->consumers[(string) $tag]);
397
    }
398
399
    /**
400
     * {@inheritdoc}
401
     */
402 3
    public function getConsumerTags()
403
    {
404 3
        return array_keys($this->consumers);
405
    }
406
407
    /**
408
     * @return string
409
     */
410 3
    public function getStatus()
411
    {
412 3
        return $this->status;
413
    }
414
415
    /**
416
     * Sends frame to the server.
417
     *
418
     * @param Frame $frame
419
     *
420
     * @return $this
421
     */
422 46
    private function send(Frame $frame)
423
    {
424 46
        $this->wire->send($frame);
425
426 46
        return $this;
427
    }
428
429
    /**
430
     * @param string|array $type
431
     *
432
     * @return Frame
433
     */
434 37
    private function wait($type)
435
    {
436 37
        return $this->wire->wait($this->id, $type);
437
    }
438
439
    /**
440
     * @param Frame $frame
441
     */
442 29
    public function dispatch(Frame $frame)
443
    {
444 29
        if ($frame instanceof ChannelClose) {
445 3
            $this->onChannelClose($frame);
446 28
        } elseif ($frame instanceof ChannelFlow) {
447 1
            $this->onChannelFlow($frame);
448 28
        } elseif ($frame instanceof BasicDeliver) {
449 6
            $this->onBasicDeliver($frame);
450 26
        } elseif ($frame instanceof BasicReturn) {
451 4
            $this->onBasicReturn($frame);
452 24
        } elseif ($frame instanceof BasicAck) {
453 3
            $this->onBasicAck($frame);
454 22
        } elseif ($frame instanceof BasicNack) {
455 2
            $this->onBasicNack($frame);
456 20
        } elseif ($frame instanceof BasicCancel) {
457 3
            $this->onBasicCancel($frame);
458 3
        }
459 24
    }
460
461
    /**
462
     * @param BasicDeliver $frame
463
     *
464
     * @throws \Exception
465
     */
466 6
    private function onBasicDeliver(BasicDeliver $frame)
467
    {
468 6
        $header = $this->readHeader();
469 6
        $content = $this->readContent($header->getSize());
470
471 6
        if (!isset($this->consumers[$frame->getConsumerTag()])) {
472 1
            throw new UnknownConsumerTagException(sprintf(
473 1
                'Consumer with tag "%s" does not exist',
474 1
                $frame->getConsumerTag()
475 1
            ));
476
        }
477
478 5
        $delivery = new Delivery(
479 5
            $this,
480 5
            $frame->getConsumerTag(),
481 5
            $frame->getDeliveryTag(),
482 5
            $frame->isRedelivered(),
483 5
            $frame->getExchange(),
484 5
            $frame->getRoutingKey(),
485 5
            $content,
486 5
            $header->getProperties()
487 5
        );
488
489 5
        call_user_func($this->consumers[$frame->getConsumerTag()], $delivery);
490 5
    }
491
492
    /**
493
     * @param BasicReturn $frame
494
     *
495
     * @throws NoReturnException
496
     */
497 4
    private function onBasicReturn(BasicReturn $frame)
498
    {
499 4
        $header = $this->readHeader();
500 4
        $content = $this->readContent($header->getSize());
501
502 4
        if (!$this->returnCallable) {
503 2
            throw new NoReturnException(
504
                'A message was returned but there is no return handler. '.
505 2
                'Make sure you setup a handler for returned messages using Channel::onReturn method, '.
506
                ', or remove MANDATORY and IMMEDIATE flags when publishing messages.'
507 2
            );
508
        }
509
510 2
        $returned = new Returned(
511 2
            $frame->getReplyCode(),
512 2
            $frame->getReplyText(),
513 2
            $frame->getExchange(),
514 2
            $frame->getRoutingKey(),
515 2
            $content,
516 2
            $header->getProperties()
517 2
        );
518
519 2
        call_user_func($this->returnCallable, $returned);
520 2
    }
521
522
    /**
523
     * @param BasicAck $frame
524
     */
525 3
    private function onBasicAck(BasicAck $frame)
526
    {
527 3
        $this->confirmPublishing(true, $frame->getDeliveryTag(), $frame->isMultiple());
528 2
    }
529
530
    /**
531
     * @param BasicNack $frame
532
     */
533 2
    private function onBasicNack(BasicNack $frame)
534
    {
535 2
        $this->confirmPublishing(false, $frame->getDeliveryTag(), $frame->isMultiple());
536 1
    }
537
538
    /**
539
     * @param bool   $ok
540
     * @param string $tag
541
     * @param bool   $multiple
542
     */
543 5
    private function confirmPublishing($ok, $tag, $multiple)
544
    {
545 5
        if (!$this->confirmCallable) {
546 2
            throw new \RuntimeException(
547
                'Something is wrong: channel is in confirm mode, but confirm callable is not set'
548 2
            );
549
        }
550
551 3
        call_user_func($this->confirmCallable, new Confirm($ok, $tag, $multiple));
552 3
    }
553
554
    /**
555
     * @param BasicCancel $frame
556
     */
557 3
    private function onBasicCancel(BasicCancel $frame)
558
    {
559 3
        unset($this->consumers[$frame->getConsumerTag()]);
560
561 3
        if (!$frame->isNoWait()) {
562 1
            $this->send(new BasicCancelOk($this->id, $frame->getConsumerTag()));
563 1
        }
564 3
    }
565
566
    /**
567
     * @param ChannelFlow $frame
568
     */
569 1
    private function onChannelFlow(ChannelFlow $frame)
570
    {
571 1
        $this->send(new ChannelFlowOk($this->id, $frame->isActive()));
572
573 1
        $this->status = $frame->isActive() ? self::STATUS_READY : self::STATUS_INACTIVE;
574 1
    }
575
576
    /**
577
     * @param ChannelClose $frame
578
     *
579
     * @throws AMQPException
580
     */
581 3
    private function onChannelClose(ChannelClose $frame)
582
    {
583 3
        $this->send(new ChannelCloseOk($this->id));
584
585 3
        $this->status = self::STATUS_CLOSED;
586
587 3
        throw AMQPException::make($frame->getReplyText(), $frame->getReplyCode());
588
    }
589
590
    /**
591
     * @return Header
592
     */
593 15
    private function readHeader()
594
    {
595 15
        return $this->wait(Header::class);
596
    }
597
598
    /**
599
     * @param int $size
600
     *
601
     * @return string
602
     */
603 15
    private function readContent($size)
604
    {
605 15
        $content = '';
606
607 15
        while ($size > strlen($content)) {
608 12
            $content .= $this->wait(Content::class)
609 12
                ->getData();
610 12
        }
611
612 15
        return $content;
613
    }
614
}
615