RabbitMQRequirementListener::handle()   B
last analyzed

Complexity

Conditions 6
Paths 10

Size

Total Lines 71
Code Lines 43

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 32
CRAP Score 7.3329

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 71
ccs 32
cts 48
cp 0.6667
rs 8.5309
cc 6
eloc 43
nc 10
nop 1
crap 7.3329

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
namespace Innmind\ProvisionerBundle\EventListener;
4
5
use Innmind\ProvisionerBundle\ProcessStatusHandler;
6
use Innmind\ProvisionerBundle\RabbitMQ\HistoryInterface;
7
use Innmind\ProvisionerBundle\RabbitMQ\Admin;
8
use Innmind\ProvisionerBundle\Event\ProvisionRequirementEvent;
9
use Innmind\ProvisionerBundle\Math;
10
use Psr\Log\LoggerInterface;
11
use RuntimeException;
12
13
class RabbitMQRequirementListener
14
{
15
    protected $processStatus;
16
    protected $queueHistory;
17
    protected $admin;
18
    protected $logger;
19
20
    /**
21
     * Set process status handler
22
     *
23
     * @param ProcessStatusHandler $handler
24
     */
25 12
    public function setProcessStatusHandler(ProcessStatusHandler $handler)
26
    {
27 12
        $this->processStatus = $handler;
28 12
    }
29
30
    /**
31
     * Set the queue history object to help retrieve last
32
     * queue depth number
33
     *
34
     * @param HistoryInterface $history
35
     */
36 12
    public function setQueueHistory(HistoryInterface $history)
37
    {
38 12
        $this->queueHistory = $history;
39 12
    }
40
41
    /**
42
     * Set the rabbitmq admin interface
43
     *
44
     * @param Admin $admin
45
     */
46 12
    public function setRabbitMQAdmin(Admin $admin)
47
    {
48 12
        $this->admin = $admin;
49 12
    }
50
51
    /**
52
     * Set the logger
53
     *
54
     * @param LoggerInterface $logger
55
     */
56
    public function setLogger(LoggerInterface $logger)
57
    {
58
        $this->logger = $logger;
59
    }
60
61
    /**
62
     * Compute how many consumers needs to be run
63
     *
64
     * @param ProvisionRequirementEvent $event
65
     */
66 6
    public function handle(ProvisionRequirementEvent $event)
67
    {
68 6
        if ($event->getCommandName() !== 'rabbitmq:consumer') {
69 3
            return;
70
        }
71
72 3
        $input = $event->getCommandInput();
73
74 3
        if (!$input->hasOption('messages')) {
75
            throw new RuntimeException('Consumers must be run with the "messages" option');
76
        }
77
78 3
        $command = sprintf(
79 3
            'console %s',
80
            (string) $input
81 3
        );
82
83 3
        $consumers = $this->processStatus->getProcessCount($command);
84 3
        $messages = (int) $input->getOption('messages');
85
86 3
        $queue = $input->getArgument('name');
87
88 3
        $depth = $this->admin->listQueueMessages($queue);
89
90 3
        $depthHistory = $this->queueHistory->get(sprintf(
91 3
            '%s.queue_depth',
92
            $queue
93 3
        ));
94
95 3
        if ($this->logger) {
96
            $this->logger->info(
97
                'Estimating required consumers processes',
98
                [
99
                    'queue_depth' => $depth,
100
                    'previous_depth' => end($depthHistory),
101
                ]
102
            );
103
        }
104
105 3
        $depthHistory[] = $depth;
106
107 3
        if (count($depthHistory) > 1) {
108
            $estimated = $this->getEstimatedDepth($depthHistory);
109
            $depth = $estimated['current'];
110
            $previousDepth = $estimated['previous'];
111
        } else {
112 3
            $previousDepth = 0;
113
        }
114
115 3
        $event->setRequiredProcesses(
116 3
            $this->computeRequirement(
117 3
                $previousDepth,
118 3
                $depth,
119 3
                $messages,
120
                $consumers
121 3
            )
122 3
        );
123 3
        $event->stopPropagation();
124
125 3
        $this->queueHistory->put(sprintf(
126 3
            '%s.queue_depth',
127
            $queue
128 3
        ), $depthHistory);
129
130 3
        if ($this->logger) {
131
            $this->logger->info(sprintf(
132
                'Required consumers processes estimated to %s',
133
                $event->getRequiredProcesses()
134
            ));
135
        }
136 3
    }
137
138
    /**
139
     * Determine how many consumers to launch based on previous
140
     * messages count and the new one, and the available consumers
141
     *
142
     * @param float $previousDepth
143
     * @param float $currentDepth
144
     * @param int $messages Messages consumed by one consumer
145
     * @param int $consumers Running consumers
146
     *
147
     * @return int
148
     */
149 6
    public function computeRequirement($previousDepth, $currentDepth, $messages, $consumers)
150
    {
151 6
        if ((int) $previousDepth === 0) {
152
            if (
153 6
                (float) $currentDepth < (int) $messages &&
154
                (int) $consumers >= 1
155 6
            ) {
156 3
                $consumersToLaunch = 0;
157 3
            } else {
158 6
                $consumersToLaunch = 2;
159
            }
160 6
        } else {
161 3
            $diff = (float) $currentDepth - (float) $previousDepth;
162
163 3
            if ($diff <= 0) {
164 3
                $consumersToLaunch = 1;
165
166
                if (
167 3
                    (float) $currentDepth < (int) $messages &&
168
                    (int) $consumers >= 1
169 3
                ) {
170 3
                    $consumersToLaunch = 0;
171 3
                } else if (
172 3
                    (int) $diff === 0 &&
173
                    $currentDepth > (int) $messages
174 3
                ) {
175 3
                    $consumersToLaunch = 2;
176 3
                }
177 3
            } else {
178 3
                $consumersToLaunch = (int) floor($diff / $messages);
179
            }
180
        }
181
182 6
        return $consumersToLaunch;
183
    }
184
185
    /**
186
     * Return estimated previous and current depth once
187
     * a linear regression is applied to the real history
188
     *
189
     * @param array $history
190
     *
191
     * @return array As ['previous' => float, 'current' => float]
192
     */
193 3
    public function getEstimatedDepth(array $history)
194
    {
195 3
        $regression = Math::linearRegression($history);
196
197 3
        $previous = (count($history) - 2) * $regression['slope'] + $regression['intercept'];
198 3
        $current = (count($history) - 1) * $regression['slope'] + $regression['intercept'];
199
200
        return [
201 3
            'previous' => $previous,
202 3
            'current' => $current,
203 3
        ];
204
    }
205
}
206