ConsumerManager::getQueueReadyMessagesCount()   A
last analyzed

Complexity

Conditions 4
Paths 3

Size

Total Lines 15
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 8
CRAP Score 4

Importance

Changes 0
Metric Value
dl 0
loc 15
ccs 8
cts 8
cp 1
rs 9.2
c 0
b 0
f 0
cc 4
eloc 7
nc 3
nop 3
crap 4
1
<?php
2
namespace pmill\RabbitRabbit;
3
4
use BadMethodCallException;
5
use pmill\RabbitRabbit\Conditions\ConditionInterface;
6
use pmill\RabbitRabbit\Exceptions\RabbitResponseException;
7
use RabbitMq\ManagementApi\Client as RabbitClient;
8
9
class ConsumerManager
10
{
11
    /**
12
     * @var RabbitClient
13
     */
14
    protected $rabbitClient;
15
16
    /**
17
     * @var array
18
     */
19
    protected $rules = [];
20
21
    /**
22
     * ConsumerManager constructor.
23
     *
24
     * @param RabbitConfig $config
25
     */
26 4
    public function __construct(RabbitConfig $config = null)
27
    {
28 4
        if ($config) {
29 1
            $this->rabbitClient = new RabbitClient(
30 1
                null,
31 1
                $config->getBaseUrl(),
32 1
                $config->getUsername(),
33 1
                $config->getPassword()
34
            );
35
        }
36 4
    }
37
38
    /**
39
     * @param RuleInterface $rule
40
     * @param ConditionInterface $condition
41
     */
42 2
    public function addRule(RuleInterface $rule, ConditionInterface $condition): void
43
    {
44 2
        $this->rules[] = [
45 2
            'rule' => $rule,
46 2
            'condition' => $condition,
47
        ];
48 2
    }
49
50
    /**
51
     * @throws RabbitResponseException
52
     */
53 4
    public function run(): void
54
    {
55 4
        if (!$this->rabbitClient) {
56 1
            throw new BadMethodCallException('A rabbit client is required before running this method');
57
        }
58
59 3
        $arrayCache = [];
60
61 3
        foreach ($this->rules as $ruleData) {
62
            /** @var RuleInterface $rule */
63 2
            $rule = $ruleData['rule'];
64
            /** @var ConditionInterface $condition */
65 2
            $condition = $ruleData['condition'];
66
67 2
            $readyMessageCount = $this->getQueueReadyMessagesCount(
68 2
                $rule->getVHostName(),
69 2
                $rule->getQueueName(),
70 2
                $arrayCache
71
            );
72
73 1
            if ($condition->shouldRun($readyMessageCount)) {
74 1
                $rule->run($readyMessageCount);
75
            }
76
        }
77 2
    }
78
79
    /**
80
     * @param RabbitClient $rabbitClient
81
     */
82 2
    public function setRabbitClient(RabbitClient $rabbitClient): void
83
    {
84 2
        $this->rabbitClient = $rabbitClient;
85 2
    }
86
87
    /**
88
     * @param string $vhostName
89
     * @param string $queueName
90
     * @param array $arrayCache
91
     *
92
     * @return int
93
     * @throws RabbitResponseException
94
     */
95 2
    protected function getQueueReadyMessagesCount($vhostName, $queueName, array &$arrayCache): int
96
    {
97 2
        $cacheKey = $vhostName . '/' . $queueName;
98
99 2
        if (!isset($arrayCache[$cacheKey])) {
100 2
            $queueData = $this->rabbitClient->queues()->get($vhostName, $queueName);
101
            
102 2
            if (isset($queueData['error']) && $queueData['error']) {
103 1
                throw new RabbitResponseException($queueData);
104
            }
105
106 1
            $arrayCache[$cacheKey] = (int)$queueData['messages_ready'];
107
        }
108
109 1
        return $arrayCache[$cacheKey];
110
    }
111
}
112