MultipleConsumer   A
last analyzed

Complexity

Total Complexity 18

Size/Duplication

Total Lines 115
Duplicated Lines 0 %

Test Coverage

Coverage 61.53%

Importance

Changes 3
Bugs 0 Features 0
Metric Value
eloc 41
dl 0
loc 115
ccs 32
cts 52
cp 0.6153
rs 10
c 3
b 0
f 0
wmc 18

9 Methods

Rating   Name   Duplication   Size   Complexity  
A setQueuesProvider() 0 4 1
A setQueues() 0 3 1
A getQueueConsumerTag() 0 3 1
A setContext() 0 3 1
A setupConsumer() 0 14 3
A stopConsuming() 0 4 2
A mergeQueues() 0 6 2
A queueDeclare() 0 24 5
A processQueueMessage() 0 7 2
1
<?php
2
3
namespace OldSound\RabbitMqBundle\RabbitMq;
4
5
use OldSound\RabbitMqBundle\Provider\QueuesProviderInterface;
6
use OldSound\RabbitMqBundle\RabbitMq\Exception\QueueNotFoundException;
7
use PhpAmqpLib\Message\AMQPMessage;
8
9
class MultipleConsumer extends Consumer
10
{
11
    protected $queues = [];
12
13
    /**
14
     * Queues provider
15
     *
16
     * @var QueuesProviderInterface|null
17
     */
18
    protected $queuesProvider = null;
19
20
    /**
21
     * Context the consumer runs in
22
     *
23
     * @var string
24
     */
25
    protected $context = null;
26
27
    /**
28
     * QueuesProvider setter
29
     *
30
     * @param QueuesProviderInterface $queuesProvider
31
     *
32
     * @return self
33
     */
34 12
    public function setQueuesProvider(QueuesProviderInterface $queuesProvider)
35
    {
36 12
        $this->queuesProvider = $queuesProvider;
37 12
        return $this;
38
    }
39
40
    public function getQueueConsumerTag($queue)
41
    {
42
        return sprintf('%s-%s', $this->getConsumerTag(), $queue);
43
    }
44
45 14
    public function setQueues(array $queues)
46
    {
47 14
        $this->queues = $queues;
48 14
    }
49
50
    public function setContext($context)
51
    {
52
        $this->context = $context;
53
    }
54
55
    protected function setupConsumer()
56
    {
57
        $this->mergeQueues();
58
59
        if ($this->autoSetupFabric) {
60
            $this->setupFabric();
61
        }
62
63
        foreach ($this->queues as $name => $options) {
64
            //PHP 5.3 Compliant
65
            $currentObject = $this;
66
67
            $this->getChannel()->basic_consume($name, $this->getQueueConsumerTag($name), false, $this->consumerOptions['no_ack'], false, false, function (AMQPMessage $msg) use ($currentObject, $name) {
68
                $currentObject->processQueueMessage($name, $msg);
69
            });
70
        }
71
    }
72
73 2
    protected function queueDeclare()
74
    {
75 2
        foreach ($this->queues as $name => $options) {
76 2
            [$queueName, , ] = $this->getChannel()->queue_declare(
77 2
                $name,
78 2
                $options['passive'],
79 2
                $options['durable'],
80 2
                $options['exclusive'],
81 2
                $options['auto_delete'],
82 2
                $options['nowait'],
83 2
                $options['arguments'],
84 2
                $options['ticket']
85
            );
86
87 2
            if (isset($options['routing_keys']) && count($options['routing_keys']) > 0) {
88 1
                foreach ($options['routing_keys'] as $routingKey) {
89 1
                    $this->queueBind($queueName, $this->exchangeOptions['name'], $routingKey, $options['arguments'] ?? []);
90
                }
91
            } else {
92 1
                $this->queueBind($queueName, $this->exchangeOptions['name'], $this->routingKey, $options['arguments'] ?? []);
93
            }
94
        }
95
96 2
        $this->queueDeclared = true;
97 2
    }
98
99 18
    public function processQueueMessage($queueName, AMQPMessage $msg)
100
    {
101 18
        if (!isset($this->queues[$queueName])) {
102
            throw new QueueNotFoundException();
103
        }
104
105 18
        $this->processMessageQueueCallback($msg, $queueName, $this->queues[$queueName]['callback']);
106 18
    }
107
108
    public function stopConsuming()
109
    {
110
        foreach ($this->queues as $name => $options) {
111
            $this->getChannel()->basic_cancel($this->getQueueConsumerTag($name), false, true);
112
        }
113
    }
114
115
    /**
116
     * Merges static and provided queues into one array
117
     */
118 12
    protected function mergeQueues()
119
    {
120 12
        if ($this->queuesProvider) {
121 12
            $this->queues = array_merge(
122 12
                $this->queues,
123 12
                $this->queuesProvider->getQueues()
124
            );
125
        }
126 12
    }
127
}
128