Passed
Push — master ( 2f0dcd...638ea4 )
by Mihai
02:42 queued 12s
created

MultipleConsumer   A

Complexity

Total Complexity 18

Size/Duplication

Total Lines 109
Duplicated Lines 0 %

Test Coverage

Coverage 57.45%

Importance

Changes 3
Bugs 0 Features 0
Metric Value
eloc 36
c 3
b 0
f 0
dl 0
loc 109
ccs 27
cts 47
cp 0.5745
rs 10
wmc 18

9 Methods

Rating   Name   Duplication   Size   Complexity  
A setQueuesProvider() 0 4 1
A setQueues() 0 3 1
A stopConsuming() 0 4 2
A getQueueConsumerTag() 0 3 1
A mergeQueues() 0 6 2
A queueDeclare() 0 18 5
A processQueueMessage() 0 7 2
A setContext() 0 3 1
A setupConsumer() 0 14 3
1
<?php
2
3
namespace OldSound\RabbitMqBundle\RabbitMq;
4
5
use OldSound\RabbitMqBundle\Provider\QueuesProviderInterface;
6
use OldSound\RabbitMqBundle\RabbitMq\Exception\QueueNotFoundException;
7
use PhpAmqpLib\Message\AMQPMessage;
8
9
class MultipleConsumer extends Consumer
10
{
11
    protected $queues = array();
12
13
    /**
14
     * Queues provider
15
     *
16
     * @var QueuesProviderInterface|null
17
     */
18
    protected $queuesProvider = null;
19
    
20
    /**
21
     * Context the consumer runs in
22
     *
23
     * @var string
24
     */
25
    protected $context = null;
26
27
    /**
28
     * QueuesProvider setter
29
     *
30
     * @param QueuesProviderInterface $queuesProvider
31
     *
32
     * @return self
33
     */
34 12
    public function setQueuesProvider(QueuesProviderInterface $queuesProvider)
35
    {
36 12
        $this->queuesProvider = $queuesProvider;
37 12
        return $this;
38
    }
39
40
    public function getQueueConsumerTag($queue)
41
    {
42
        return sprintf('%s-%s', $this->getConsumerTag(), $queue);
43
    }
44
45 14
    public function setQueues(array $queues)
46
    {
47 14
        $this->queues = $queues;
48 14
    }
49
    
50
    public function setContext($context)
51
    {
52
        $this->context = $context;
53
    }
54
55
    protected function setupConsumer()
56
    {
57
        $this->mergeQueues();
58
59
        if ($this->autoSetupFabric) {
60
            $this->setupFabric();
61
        }
62
63
        foreach ($this->queues as $name => $options) {
64
            //PHP 5.3 Compliant
65
            $currentObject = $this;
66
67
            $this->getChannel()->basic_consume($name, $this->getQueueConsumerTag($name), false, false, false, false, function (AMQPMessage $msg) use($currentObject, $name) {
68
                $currentObject->processQueueMessage($name, $msg);
69
            });
70
        }
71
    }
72
73 2
    protected function queueDeclare()
74
    {
75 2
        foreach ($this->queues as $name => $options) {
76 2
            list($queueName, ,) = $this->getChannel()->queue_declare($name, $options['passive'],
77 2
                $options['durable'], $options['exclusive'],
78 2
                $options['auto_delete'], $options['nowait'],
79 2
                $options['arguments'], $options['ticket']);
80
81 2
            if (isset($options['routing_keys']) && count($options['routing_keys']) > 0) {
82 1
                foreach ($options['routing_keys'] as $routingKey) {
83 1
                    $this->queueBind($queueName, $this->exchangeOptions['name'], $routingKey, $options['arguments'] ?? []);
84
                }
85
            } else {
86 1
                $this->queueBind($queueName, $this->exchangeOptions['name'], $this->routingKey, $options['arguments'] ?? []);
87
            }
88
        }
89
90 2
        $this->queueDeclared = true;
91 2
    }
92
93 18
    public function processQueueMessage($queueName, AMQPMessage $msg)
94
    {
95 18
        if (!isset($this->queues[$queueName])) {
96
            throw new QueueNotFoundException();
97
        }
98
99 18
        $this->processMessageQueueCallback($msg, $queueName, $this->queues[$queueName]['callback']);
100 18
    }
101
102
    public function stopConsuming()
103
    {
104
        foreach ($this->queues as $name => $options) {
105
            $this->getChannel()->basic_cancel($this->getQueueConsumerTag($name), false, true);
106
        }
107
    }
108
109
    /**
110
     * Merges static and provided queues into one array
111
     */
112 12
    protected function mergeQueues()
113
    {
114 12
        if ($this->queuesProvider) {
115 12
            $this->queues = array_merge(
116 12
                $this->queues,
117 12
                $this->queuesProvider->getQueues()
118
            );
119
        }
120 12
    }
121
}
122