Consumer   B
last analyzed

Complexity

Total Complexity 46

Size/Duplication

Total Lines 313
Duplicated Lines 0 %

Test Coverage

Coverage 73.28%

Importance

Changes 17
Bugs 0 Features 2
Metric Value
eloc 116
dl 0
loc 313
ccs 96
cts 131
cp 0.7328
rs 8.72
c 17
b 0
f 2
wmc 46

19 Methods

Rating   Name   Duplication   Size   Complexity  
A isRamAlmostOverloaded() 0 5 1
A processMessage() 0 3 1
A getMemoryLimit() 0 3 1
C consume() 0 52 12
A setGracefulMaxExecutionDateTimeFromSecondsInTheFuture() 0 3 1
B handleProcessMessage() 0 21 8
A purge() 0 3 1
A setGracefulMaxExecutionDateTime() 0 3 1
A processMessageQueueCallback() 0 47 4
A setMemoryLimit() 0 3 1
A setGracefulMaxExecutionTimeoutExitCode() 0 3 1
A delete() 0 3 1
A getGracefulMaxExecutionDateTime() 0 3 1
A getTimeoutWait() 0 3 1
A setLastActivityDateTime() 0 3 1
A getGracefulMaxExecutionTimeoutExitCode() 0 3 1
A getLastActivityDateTime() 0 3 1
A setTimeoutWait() 0 3 1
B chooseWaitTimeout() 0 32 7

How to fix   Complexity   

Complex Class

Complex classes like Consumer often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use Consumer, and based on these observations, apply Extract Interface, too.

1
<?php
2
3
namespace OldSound\RabbitMqBundle\RabbitMq;
4
5
use OldSound\RabbitMqBundle\Event\AfterProcessingMessageEvent;
6
use OldSound\RabbitMqBundle\Event\BeforeProcessingMessageEvent;
7
use OldSound\RabbitMqBundle\Event\OnConsumeEvent;
8
use OldSound\RabbitMqBundle\Event\OnIdleEvent;
9
use OldSound\RabbitMqBundle\MemoryChecker\MemoryConsumptionChecker;
10
use OldSound\RabbitMqBundle\MemoryChecker\NativeMemoryUsageProvider;
11
use PhpAmqpLib\Exception\AMQPTimeoutException;
12
use PhpAmqpLib\Message\AMQPMessage;
13
14
class Consumer extends BaseConsumer
15
{
16
    /**
17
     * @var int|null $memoryLimit
18
     */
19
    protected $memoryLimit = null;
20
21
    /**
22
     * @var \DateTime|null DateTime after which the consumer will gracefully exit. "Gracefully" means, that
23
     *      any currently running consumption will not be interrupted.
24
     */
25
    protected $gracefulMaxExecutionDateTime;
26
27
    /**
28
     * @var int Exit code used, when consumer is closed by the Graceful Max Execution Timeout feature.
29
     */
30
    protected $gracefulMaxExecutionTimeoutExitCode = 0;
31
32
    /**
33
     * @var int|null
34
     */
35
    protected $timeoutWait;
36
37
    /**
38
     * @var \DateTime|null
39
     */
40
    protected $lastActivityDateTime;
41
42
    /**
43
     * Set the memory limit
44
     *
45
     * @param int $memoryLimit
46
     */
47
    public function setMemoryLimit($memoryLimit)
48
    {
49
        $this->memoryLimit = $memoryLimit;
50
    }
51
52
    /**
53
     * Get the memory limit
54
     *
55
     * @return int|null
56
     */
57 32
    public function getMemoryLimit()
58
    {
59 32
        return $this->memoryLimit;
60
    }
61
62
    /**
63
     * Consume the message
64
     *
65
     * @param   int     $msgAmount
66
     *
67
     * @return  int
68
     *
69
     * @throws  AMQPTimeoutException
70
     */
71 18
    public function consume($msgAmount)
72
    {
73 18
        $this->target = $msgAmount;
74
75 18
        $this->setupConsumer();
76
77 18
        $this->setLastActivityDateTime(new \DateTime());
78 18
        while (count($this->getChannel()->callbacks)) {
79 16
            $this->dispatchEvent(OnConsumeEvent::NAME, new OnConsumeEvent($this));
80 16
            $this->maybeStopConsumer();
81
82
            /*
83
             * Be careful not to trigger ::wait() with 0 or less seconds, when
84
             * graceful max execution timeout is being used.
85
             */
86 16
            $waitTimeout = $this->chooseWaitTimeout();
87 16
            if ($this->gracefulMaxExecutionDateTime
88 16
                && $waitTimeout < 1
89
            ) {
90 4
                return $this->gracefulMaxExecutionTimeoutExitCode;
91
            }
92
93 14
            if (!$this->forceStop) {
94
                try {
95 14
                    $this->getChannel()->wait(null, false, $waitTimeout);
96 2
                    $this->setLastActivityDateTime(new \DateTime());
97 12
                } catch (AMQPTimeoutException $e) {
98 12
                    $now = time();
99
100 12
                    if ($this->gracefulMaxExecutionDateTime
101 12
                        && $this->gracefulMaxExecutionDateTime <= new \DateTime("@$now")
102
                    ) {
103 4
                        return $this->gracefulMaxExecutionTimeoutExitCode;
104 8
                    } elseif ($this->getIdleTimeout()
105 8
                        && ($this->getLastActivityDateTime()->getTimestamp() + $this->getIdleTimeout() <= $now)
106
                    ) {
107 6
                        $idleEvent = new OnIdleEvent($this);
108 6
                        $this->dispatchEvent(OnIdleEvent::NAME, $idleEvent);
109
110 6
                        if ($idleEvent->isForceStop()) {
111 6
                            if (null !== $this->getIdleTimeoutExitCode()) {
112 4
                                return $this->getIdleTimeoutExitCode();
113
                            } else {
114 2
                                throw $e;
115
                            }
116
                        }
117
                    }
118
                }
119
            }
120
        }
121
122 4
        return 0;
123
    }
124
125
    /**
126
     * Purge the queue
127
     */
128
    public function purge()
129
    {
130
        $this->getChannel()->queue_purge($this->queueOptions['name'], true);
131
    }
132
133
    /**
134
     * Delete the queue
135
     */
136
    public function delete()
137
    {
138
        $this->getChannel()->queue_delete($this->queueOptions['name'], true);
139
    }
140
141 32
    protected function processMessageQueueCallback(AMQPMessage $msg, $queueName, $callback)
142
    {
143 32
        $this->dispatchEvent(BeforeProcessingMessageEvent::NAME,
144 32
            new BeforeProcessingMessageEvent($this, $msg)
145
        );
146
        try {
147 32
            $processFlag = call_user_func($callback, $msg);
148 32
            $this->handleProcessMessage($msg, $processFlag);
149 32
            $this->dispatchEvent(
150 32
                AfterProcessingMessageEvent::NAME,
151 32
                new AfterProcessingMessageEvent($this, $msg)
152
            );
153 32
            $this->logger->debug('Queue message processed', array(
154
                'amqp' => array(
155 32
                    'queue' => $queueName,
156 32
                    'message' => $msg,
157 32
                    'return_code' => $processFlag
158
                )
159
            ));
160
        } catch (Exception\StopConsumerException $e) {
161
            $this->logger->info('Consumer requested restart', array(
162
                'amqp' => array(
163
                    'queue' => $queueName,
164
                    'message' => $msg,
165
                    'stacktrace' => $e->getTraceAsString()
166
                )
167
            ));
168
            $this->handleProcessMessage($msg, $e->getHandleCode());
169
            $this->stopConsuming();
170
        } catch (\Exception $e) {
171
            $this->logger->error($e->getMessage(), array(
172
                'amqp' => array(
173
                    'queue' => $queueName,
174
                    'message' => $msg,
175
                    'stacktrace' => $e->getTraceAsString()
176
                )
177
            ));
178
            throw $e;
179
        } catch (\Error $e) {
180
            $this->logger->error($e->getMessage(), array(
181
                'amqp' => array(
182
                    'queue' => $queueName,
183
                    'message' => $msg,
184
                    'stacktrace' => $e->getTraceAsString()
185
                )
186
            ));
187
            throw $e;
188
        }
189 32
    }
190
191 14
    public function processMessage(AMQPMessage $msg)
192
    {
193 14
        $this->processMessageQueueCallback($msg, $this->queueOptions['name'], $this->callback);
194 14
    }
195
196 32
    protected function handleProcessMessage(AMQPMessage $msg, $processFlag)
197
    {
198 32
        if ($processFlag === ConsumerInterface::MSG_REJECT_REQUEUE || false === $processFlag) {
199
            // Reject and requeue message to RabbitMQ
200 10
            $msg->delivery_info['channel']->basic_reject($msg->delivery_info['delivery_tag'], true);
0 ignored issues
show
Deprecated Code introduced by
The property PhpAmqpLib\Message\AMQPMessage::$delivery_info has been deprecated. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

200
            /** @scrutinizer ignore-deprecated */ $msg->delivery_info['channel']->basic_reject($msg->delivery_info['delivery_tag'], true);
Loading history...
201 22
        } else if ($processFlag === ConsumerInterface::MSG_SINGLE_NACK_REQUEUE) {
202
            // NACK and requeue message to RabbitMQ
203
            $msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag'], false, true);
0 ignored issues
show
Deprecated Code introduced by
The property PhpAmqpLib\Message\AMQPMessage::$delivery_info has been deprecated. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

203
            /** @scrutinizer ignore-deprecated */ $msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag'], false, true);
Loading history...
204 22
        } else if ($processFlag === ConsumerInterface::MSG_REJECT) {
205
            // Reject and drop
206 5
            $msg->delivery_info['channel']->basic_reject($msg->delivery_info['delivery_tag'], false);
0 ignored issues
show
Deprecated Code introduced by
The property PhpAmqpLib\Message\AMQPMessage::$delivery_info has been deprecated. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

206
            /** @scrutinizer ignore-deprecated */ $msg->delivery_info['channel']->basic_reject($msg->delivery_info['delivery_tag'], false);
Loading history...
207 17
        } else if ($processFlag !== ConsumerInterface::MSG_ACK_SENT) {
208
            // Remove message from queue only if callback return not false
209 15
            $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
0 ignored issues
show
Deprecated Code introduced by
The property PhpAmqpLib\Message\AMQPMessage::$delivery_info has been deprecated. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

209
            $msg->delivery_info['channel']->basic_ack(/** @scrutinizer ignore-deprecated */ $msg->delivery_info['delivery_tag']);
Loading history...
210
        }
211
212 32
        $this->consumed++;
213 32
        $this->maybeStopConsumer();
214
215 32
        if (!is_null($this->getMemoryLimit()) && $this->isRamAlmostOverloaded()) {
216
            $this->stopConsuming();
217
        }
218 32
    }
219
220
    /**
221
     * Checks if memory in use is greater or equal than memory allowed for this process
222
     *
223
     * @return boolean
224
     */
225
    protected function isRamAlmostOverloaded()
226
    {
227
        $memoryManager = new MemoryConsumptionChecker(new NativeMemoryUsageProvider());
228
229
        return $memoryManager->isRamAlmostOverloaded($this->getMemoryLimit().'M', '5M');
230
    }
231
232
    /**
233
     * @param \DateTime|null $dateTime
234
     */
235 8
    public function setGracefulMaxExecutionDateTime(\DateTime $dateTime = null)
236
    {
237 8
        $this->gracefulMaxExecutionDateTime = $dateTime;
238 8
    }
239
240
    /**
241
     * @param int $secondsInTheFuture
242
     */
243 8
    public function setGracefulMaxExecutionDateTimeFromSecondsInTheFuture($secondsInTheFuture)
244
    {
245 8
        $this->setGracefulMaxExecutionDateTime(new \DateTime("+{$secondsInTheFuture} seconds"));
246 8
    }
247
248
    /**
249
     * @param int $exitCode
250
     */
251 2
    public function setGracefulMaxExecutionTimeoutExitCode($exitCode)
252
    {
253 2
        $this->gracefulMaxExecutionTimeoutExitCode = $exitCode;
254 2
    }
255
256 6
    public function setTimeoutWait(int $timeoutWait): void
257
    {
258 6
        $this->timeoutWait = $timeoutWait;
259 6
    }
260
261
    /**
262
     * @return \DateTime|null
263
     */
264 4
    public function getGracefulMaxExecutionDateTime()
265
    {
266 4
        return $this->gracefulMaxExecutionDateTime;
267
    }
268
269
    /**
270
     * @return int
271
     */
272
    public function getGracefulMaxExecutionTimeoutExitCode()
273
    {
274
        return $this->gracefulMaxExecutionTimeoutExitCode;
275
    }
276
277 16
    public function getTimeoutWait(): ?int
278
    {
279 16
        return $this->timeoutWait;
280
    }
281
282
    /**
283
     * Choose the timeout wait (in seconds) to use for the $this->getChannel()->wait() method.
284
     */
285 16
    private function chooseWaitTimeout(): int
286
    {
287 16
        if ($this->gracefulMaxExecutionDateTime) {
288 8
            $allowedExecutionDateInterval = $this->gracefulMaxExecutionDateTime->diff(new \DateTime());
289 8
            $allowedExecutionSeconds = $allowedExecutionDateInterval->days * 86400
290 8
                + $allowedExecutionDateInterval->h * 3600
291 8
                + $allowedExecutionDateInterval->i * 60
292 8
                + $allowedExecutionDateInterval->s;
293
294 8
            if (!$allowedExecutionDateInterval->invert) {
295 2
                $allowedExecutionSeconds *= -1;
296
            }
297
298
            /*
299
             * Respect the idle timeout if it's set and if it's less than
300
             * the remaining allowed execution.
301
             */
302 8
            if ($this->getIdleTimeout()
303 8
                && $this->getIdleTimeout() < $allowedExecutionSeconds
304
            ) {
305 2
                $waitTimeout = $this->getIdleTimeout();
306
            } else {
307 8
                $waitTimeout = $allowedExecutionSeconds;
308
            }
309
        } else {
310 8
            $waitTimeout = $this->getIdleTimeout();
311
        }
312
313 16
        if (!is_null($this->getTimeoutWait()) && $this->getTimeoutWait() > 0) {
314 6
            $waitTimeout = min($waitTimeout, $this->getTimeoutWait());
315
        }
316 16
        return $waitTimeout;
317
    }
318
319 18
    public function setLastActivityDateTime(\DateTime $dateTime)
320
    {
321 18
        $this->lastActivityDateTime = $dateTime;
322 18
    }
323
324 8
    protected function getLastActivityDateTime(): ?\DateTime
325
    {
326 8
        return $this->lastActivityDateTime;
327
    }
328
}
329