Consumer   B
last analyzed

Complexity

Total Complexity 46

Size/Duplication

Total Lines 315
Duplicated Lines 0 %

Test Coverage

Coverage 73.48%

Importance

Changes 7
Bugs 1 Features 2
Metric Value
eloc 117
c 7
b 1
f 2
dl 0
loc 315
ccs 97
cts 132
cp 0.7348
rs 8.72
wmc 46

19 Methods

Rating   Name   Duplication   Size   Complexity  
A getMemoryLimit() 0 3 1
A setMemoryLimit() 0 3 1
A isRamAlmostOverloaded() 0 5 1
A getGracefulMaxExecutionDateTime() 0 3 1
A setGracefulMaxExecutionDateTimeFromSecondsInTheFuture() 0 3 1
A getTimeoutWait() 0 3 1
B handleProcessMessage() 0 21 8
A setLastActivityDateTime() 0 3 1
A purge() 0 3 1
A getGracefulMaxExecutionTimeoutExitCode() 0 3 1
A setGracefulMaxExecutionDateTime() 0 3 1
A getLastActivityDateTime() 0 3 1
A processMessageQueueCallback() 0 48 4
A setGracefulMaxExecutionTimeoutExitCode() 0 3 1
A setTimeoutWait() 0 3 1
A delete() 0 3 1
B chooseWaitTimeout() 0 32 7
A processMessage() 0 3 1
C consume() 0 53 12

How to fix   Complexity   

Complex Class

Complex classes like Consumer often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use Consumer, and based on these observations, apply Extract Interface, too.

1
<?php
2
3
namespace OldSound\RabbitMqBundle\RabbitMq;
4
5
use OldSound\RabbitMqBundle\Event\AfterProcessingMessageEvent;
6
use OldSound\RabbitMqBundle\Event\BeforeProcessingMessageEvent;
7
use OldSound\RabbitMqBundle\Event\OnConsumeEvent;
8
use OldSound\RabbitMqBundle\Event\OnIdleEvent;
9
use OldSound\RabbitMqBundle\MemoryChecker\MemoryConsumptionChecker;
10
use OldSound\RabbitMqBundle\MemoryChecker\NativeMemoryUsageProvider;
11
use PhpAmqpLib\Exception\AMQPTimeoutException;
12
use PhpAmqpLib\Message\AMQPMessage;
13
14
class Consumer extends BaseConsumer
15
{
16
    /**
17
     * @var int|null $memoryLimit
18
     */
19
    protected $memoryLimit = null;
20
21
    /**
22
     * @var \DateTime|null DateTime after which the consumer will gracefully exit. "Gracefully" means, that
23
     *      any currently running consumption will not be interrupted.
24
     */
25
    protected $gracefulMaxExecutionDateTime;
26
27
    /**
28
     * @var int Exit code used, when consumer is closed by the Graceful Max Execution Timeout feature.
29
     */
30
    protected $gracefulMaxExecutionTimeoutExitCode = 0;
31
32
    /**
33
     * @var int|null
34
     */
35
    protected $timeoutWait;
36
37
    /**
38
     * @var \DateTime|null
39
     */
40
    protected $lastActivityDateTime;
41
42
    /**
43
     * Set the memory limit
44
     *
45
     * @param int $memoryLimit
46
     */
47
    public function setMemoryLimit($memoryLimit)
48
    {
49
        $this->memoryLimit = $memoryLimit;
50
    }
51
52
    /**
53
     * Get the memory limit
54
     *
55
     * @return int|null
56
     */
57 32
    public function getMemoryLimit()
58
    {
59 32
        return $this->memoryLimit;
60
    }
61
62
    /**
63
     * Consume the message
64
     *
65
     * @param   int     $msgAmount
66
     *
67
     * @return  int
68
     *
69
     * @throws  AMQPTimeoutException
70
     */
71 18
    public function consume($msgAmount)
72
    {
73 18
        $this->target = $msgAmount;
74
75 18
        $this->setupConsumer();
76
77 18
        $this->setLastActivityDateTime(new \DateTime());
78 18
        while ($this->getChannel()->is_consuming()) {
79 18
            $this->dispatchEvent(OnConsumeEvent::NAME, new OnConsumeEvent($this));
80 18
            $this->maybeStopConsumer();
81
82
            /*
83
             * Be careful not to trigger ::wait() with 0 or less seconds, when
84
             * graceful max execution timeout is being used.
85
             */
86 18
87 18
            $waitTimeout = $this->chooseWaitTimeout();
88 18
            if ($this->gracefulMaxExecutionDateTime
89
                && $waitTimeout < 1
90 4
            ) {
91
                return $this->gracefulMaxExecutionTimeoutExitCode;
92
            }
93 16
94
            if (!$this->forceStop) {
95 16
                try {
96 4
                    $this->getChannel()->wait(null, false, $waitTimeout);
97 12
                    $this->setLastActivityDateTime(new \DateTime());
98 12
                } catch (AMQPTimeoutException $e) {
99
                    $now = time();
100 12
101 12
                    if ($this->gracefulMaxExecutionDateTime
102
                        && $this->gracefulMaxExecutionDateTime <= new \DateTime("@$now")
103 4
                    ) {
104 8
                        return $this->gracefulMaxExecutionTimeoutExitCode;
105 8
                    } elseif ($this->getIdleTimeout()
106
                        && ($this->getLastActivityDateTime()->getTimestamp() + $this->getIdleTimeout() <= $now)
107 6
                    ) {
108 6
                        $idleEvent = new OnIdleEvent($this);
109
                        $this->dispatchEvent(OnIdleEvent::NAME, $idleEvent);
110 6
111 6
                        if ($idleEvent->isForceStop()) {
112 4
                            if (null !== $this->getIdleTimeoutExitCode()) {
113
                                return $this->getIdleTimeoutExitCode();
114 2
                            } else {
115
                                throw $e;
116
                            }
117
                        }
118
                    }
119
                }
120
            }
121
        }
122 4
123
        return 0;
124
    }
125
126
    /**
127
     * Purge the queue
128
     */
129
    public function purge()
130
    {
131
        $this->getChannel()->queue_purge($this->queueOptions['name'], true);
132
    }
133
134
    /**
135
     * Delete the queue
136
     */
137
    public function delete()
138
    {
139
        $this->getChannel()->queue_delete($this->queueOptions['name'], true);
140
    }
141 32
142
    protected function processMessageQueueCallback(AMQPMessage $msg, $queueName, $callback)
143 32
    {
144 32
        $this->dispatchEvent(
145 32
            BeforeProcessingMessageEvent::NAME,
146
            new BeforeProcessingMessageEvent($this, $msg)
147
        );
148 32
        try {
149 32
            $processFlag = call_user_func($callback, $msg);
150 32
            $this->handleProcessMessage($msg, $processFlag);
151 32
            $this->dispatchEvent(
152 32
                AfterProcessingMessageEvent::NAME,
153
                new AfterProcessingMessageEvent($this, $msg)
154 32
            );
155
            $this->logger->debug('Queue message processed', [
156 32
                'amqp' => [
157 32
                    'queue' => $queueName,
158 32
                    'message' => $msg,
159
                    'return_code' => $processFlag,
160
                ],
161
            ]);
162
        } catch (Exception\StopConsumerException $e) {
163
            $this->logger->info('Consumer requested stop', [
164
                'amqp' => [
165
                    'queue' => $queueName,
166
                    'message' => $msg,
167
                    'stacktrace' => $e->getTraceAsString(),
168
                ],
169
            ]);
170
            $this->handleProcessMessage($msg, $e->getHandleCode());
171
            $this->stopConsuming();
172
        } catch (\Exception $e) {
173
            $this->logger->error($e->getMessage(), [
174
                'amqp' => [
175
                    'queue' => $queueName,
176
                    'message' => $msg,
177
                    'stacktrace' => $e->getTraceAsString(),
178
                ],
179
            ]);
180
            throw $e;
181
        } catch (\Error $e) {
182
            $this->logger->error($e->getMessage(), [
183
                'amqp' => [
184
                    'queue' => $queueName,
185
                    'message' => $msg,
186
                    'stacktrace' => $e->getTraceAsString(),
187
                ],
188
            ]);
189
            throw $e;
190 32
        }
191
    }
192 14
193
    public function processMessage(AMQPMessage $msg)
194 14
    {
195 14
        $this->processMessageQueueCallback($msg, $this->queueOptions['name'], $this->callback);
196
    }
197 32
198
    protected function handleProcessMessage(AMQPMessage $msg, $processFlag)
199 32
    {
200
        if ($processFlag === ConsumerInterface::MSG_REJECT_REQUEUE || false === $processFlag) {
201 10
            // Reject and requeue message to RabbitMQ
202 22
            $msg->reject();
203
        } elseif ($processFlag === ConsumerInterface::MSG_SINGLE_NACK_REQUEUE) {
204
            // NACK and requeue message to RabbitMQ
205 22
            $msg->nack(true);
206
        } elseif ($processFlag === ConsumerInterface::MSG_REJECT) {
207 5
            // Reject and drop
208 17
            $msg->reject(false);
209
        } elseif ($processFlag !== ConsumerInterface::MSG_ACK_SENT) {
210 15
            // Remove message from queue only if callback return not false
211
            $msg->ack();
212
        }
213 32
214 32
        $this->consumed++;
215
        $this->maybeStopConsumer();
216 32
217
        if (!is_null($this->getMemoryLimit()) && $this->isRamAlmostOverloaded()) {
218
            $this->stopConsuming();
219 32
        }
220
    }
221
222
    /**
223
     * Checks if memory in use is greater or equal than memory allowed for this process
224
     *
225
     * @return boolean
226
     */
227
    protected function isRamAlmostOverloaded()
228
    {
229
        $memoryManager = new MemoryConsumptionChecker(new NativeMemoryUsageProvider());
230
231
        return $memoryManager->isRamAlmostOverloaded($this->getMemoryLimit().'M', '5M');
232
    }
233
234
    /**
235
     * @param \DateTime|null $dateTime
236 8
     */
237
    public function setGracefulMaxExecutionDateTime(?\DateTime $dateTime = null)
238 8
    {
239 8
        $this->gracefulMaxExecutionDateTime = $dateTime;
240
    }
241
242
    /**
243
     * @param int $secondsInTheFuture
244 8
     */
245
    public function setGracefulMaxExecutionDateTimeFromSecondsInTheFuture($secondsInTheFuture)
246 8
    {
247 8
        $this->setGracefulMaxExecutionDateTime(new \DateTime("+{$secondsInTheFuture} seconds"));
248
    }
249
250
    /**
251
     * @param int $exitCode
252 2
     */
253
    public function setGracefulMaxExecutionTimeoutExitCode($exitCode)
254 2
    {
255 2
        $this->gracefulMaxExecutionTimeoutExitCode = $exitCode;
256
    }
257 6
258
    public function setTimeoutWait(int $timeoutWait): void
259 6
    {
260 6
        $this->timeoutWait = $timeoutWait;
261
    }
262
263
    /**
264
     * @return \DateTime|null
265 4
     */
266
    public function getGracefulMaxExecutionDateTime()
267 4
    {
268
        return $this->gracefulMaxExecutionDateTime;
269
    }
270
271
    /**
272
     * @return int
273
     */
274
    public function getGracefulMaxExecutionTimeoutExitCode()
275
    {
276
        return $this->gracefulMaxExecutionTimeoutExitCode;
277
    }
278 18
279
    public function getTimeoutWait(): ?int
280 18
    {
281
        return $this->timeoutWait;
282
    }
283
284
    /**
285
     * Choose the timeout wait (in seconds) to use for the $this->getChannel()->wait() method.
286 18
     */
287
    private function chooseWaitTimeout(): int
288 18
    {
289 8
        if ($this->gracefulMaxExecutionDateTime) {
290 8
            $allowedExecutionDateInterval = $this->gracefulMaxExecutionDateTime->diff(new \DateTime());
291 8
            $allowedExecutionSeconds = $allowedExecutionDateInterval->days * 86400
292 8
                + $allowedExecutionDateInterval->h * 3600
293 8
                + $allowedExecutionDateInterval->i * 60
294
                + $allowedExecutionDateInterval->s;
295 8
296 2
            if (!$allowedExecutionDateInterval->invert) {
297
                $allowedExecutionSeconds *= -1;
298
            }
299
300
            /*
301
             * Respect the idle timeout if it's set and if it's less than
302
             * the remaining allowed execution.
303 8
             */
304 8
            if ($this->getIdleTimeout()
305
                && $this->getIdleTimeout() < $allowedExecutionSeconds
306 2
            ) {
307
                $waitTimeout = $this->getIdleTimeout();
308 8
            } else {
309
                $waitTimeout = $allowedExecutionSeconds;
310
            }
311 10
        } else {
312
            $waitTimeout = $this->getIdleTimeout();
313
        }
314 18
315 6
        if (!is_null($this->getTimeoutWait()) && $this->getTimeoutWait() > 0) {
316
            $waitTimeout = min($waitTimeout, $this->getTimeoutWait());
317 18
        }
318
        return $waitTimeout;
319
    }
320 18
321
    public function setLastActivityDateTime(\DateTime $dateTime)
322 18
    {
323 18
        $this->lastActivityDateTime = $dateTime;
324
    }
325 8
326
    protected function getLastActivityDateTime(): ?\DateTime
327 8
    {
328
        return $this->lastActivityDateTime;
329
    }
330
}
331