Consumer::handleProcessMessage()   B
last analyzed

Complexity

Conditions 5
Paths 4

Size

Total Lines 20
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 13
CRAP Score 5

Importance

Changes 3
Bugs 1 Features 0
Metric Value
c 3
b 1
f 0
dl 0
loc 20
ccs 13
cts 13
cp 1
rs 8.8571
cc 5
eloc 12
nc 4
nop 2
crap 5
1
<?php
2
3
namespace RabbitMqModule;
4
5
use PhpAmqpLib\Message\AMQPMessage;
6
7
/**
8
 * Class Consumer
9
 * @package RabbitMqModule
10
 */
11
class Consumer extends BaseConsumer
12
{
13
    /**
14
     * Purge the queue.
15
     *
16
     * @return $this
17
     */
18 1
    public function purgeQueue()
19
    {
20 1
        $this->getChannel()->queue_purge($this->getQueueOptions()->getName(), true);
21
22 1
        return $this;
23
    }
24
25
    /**
26
     * Consume the message.
27
     */
28 2
    public function consume()
29
    {
30 2
        $this->setupConsumer();
31 2
        while (count($this->getChannel()->callbacks)) {
32 2
            $this->maybeStopConsumer();
33 2
            $this->getChannel()->wait(null, false, $this->getIdleTimeout());
34 2
        }
35 2
    }
36
37
    /**
38
     * @param AMQPMessage $msg
39
     */
40 4
    public function processMessage(AMQPMessage $msg)
41
    {
42 4
        $this->getEventManager()->trigger(__FUNCTION__.'.pre', $this, compact('message'));
43
44 4
        $processFlag = call_user_func($this->getCallback(), $msg, $this);
45 4
        $this->handleProcessMessage($msg, $processFlag);
46
47 4
        $this->getEventManager()->trigger(__FUNCTION__.'.post', $this, compact('message'));
48 4
    }
49
50
    /**
51
     *
52
     * @param AMQPMessage $msg
53
     * @param $processFlag
54
     */
55 4
    protected function handleProcessMessage(AMQPMessage $msg, $processFlag)
56
    {
57 4
        $channel = $msg->delivery_info['channel'];
58
        /** @var string $deliveryTag */
59 4
        $deliveryTag = $msg->delivery_info['delivery_tag'];
60 4
        if ($processFlag === ConsumerInterface::MSG_REJECT_REQUEUE || false === $processFlag) {
61
            // Reject and requeue message to RabbitMQ
62 1
            $channel->basic_reject($deliveryTag, true);
63 4
        } elseif ($processFlag === ConsumerInterface::MSG_SINGLE_NACK_REQUEUE) {
64
            // NACK and requeue message to RabbitMQ
65 1
            $channel->basic_nack($deliveryTag, false, true);
66 3
        } elseif ($processFlag === ConsumerInterface::MSG_REJECT) {
67
            // Reject and drop
68 1
            $channel->basic_reject($deliveryTag, false);
69 1
        } else {
70
            // Remove message from queue only if callback return not false
71 1
            $channel->basic_ack($deliveryTag);
72
        }
73 4
        $this->maybeStopConsumer();
74 4
    }
75
}
76