Passed
Push — master ( 5a8d35...554209 )
by Mihai
07:44
created

Consumer::consume()   C

Complexity

Conditions 12
Paths 14

Size

Total Lines 52
Code Lines 29

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 18
CRAP Score 12.8652

Importance

Changes 5
Bugs 0 Features 1
Metric Value
cc 12
eloc 29
c 5
b 0
f 1
nc 14
nop 1
dl 0
loc 52
ccs 18
cts 22
cp 0.8182
crap 12.8652
rs 6.9666

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
namespace OldSound\RabbitMqBundle\RabbitMq;
4
5
use OldSound\RabbitMqBundle\Event\AfterProcessingMessageEvent;
6
use OldSound\RabbitMqBundle\Event\BeforeProcessingMessageEvent;
7
use OldSound\RabbitMqBundle\Event\OnConsumeEvent;
8
use OldSound\RabbitMqBundle\Event\OnIdleEvent;
9
use OldSound\RabbitMqBundle\MemoryChecker\MemoryConsumptionChecker;
10
use OldSound\RabbitMqBundle\MemoryChecker\NativeMemoryUsageProvider;
11
use PhpAmqpLib\Exception\AMQPTimeoutException;
12
use PhpAmqpLib\Message\AMQPMessage;
13
14
class Consumer extends BaseConsumer
15
{
16
    /**
17
     * @var int|null $memoryLimit
18
     */
19
    protected $memoryLimit = null;
20
21
    /**
22
     * @var \DateTime|null DateTime after which the consumer will gracefully exit. "Gracefully" means, that
23
     *      any currently running consumption will not be interrupted.
24
     */
25
    protected $gracefulMaxExecutionDateTime;
26
27
    /**
28
     * @var int Exit code used, when consumer is closed by the Graceful Max Execution Timeout feature.
29
     */
30
    protected $gracefulMaxExecutionTimeoutExitCode = 0;
31
32
    /**
33
     * @var int|null
34
     */
35
    protected $timeoutWait;
36
37
    /**
38
     * @var \DateTime|null
39
     */
40
    protected $lastActivityDateTime;
41
42
    /**
43
     * Set the memory limit
44
     *
45
     * @param int $memoryLimit
46
     */
47
    public function setMemoryLimit($memoryLimit)
48
    {
49
        $this->memoryLimit = $memoryLimit;
50 32
    }
51
52 32
    /**
53
     * Get the memory limit
54
     *
55
     * @return int|null
56
     */
57
    public function getMemoryLimit()
58
    {
59
        return $this->memoryLimit;
60
    }
61
62
    /**
63
     * Consume the message
64 12
     *
65
     * @param   int     $msgAmount
66 12
     *
67
     * @return  int
68 12
     *
69
     * @throws  AMQPTimeoutException
70 12
     */
71 10
    public function consume($msgAmount)
72 10
    {
73
        $this->target = $msgAmount;
74
75
        $this->setupConsumer();
76
77
        $this->setLastActivityDateTime(new \DateTime());
78 10
        while (count($this->getChannel()->callbacks)) {
79
            $this->dispatchEvent(OnConsumeEvent::NAME, new OnConsumeEvent($this));
80 10
            $this->maybeStopConsumer();
81 10
82
            /*
83 2
             * Be careful not to trigger ::wait() with 0 or less seconds, when
84
             * graceful max execution timeout is being used.
85
             */
86 8
            $waitTimeout = $this->chooseWaitTimeout();
87
            if ($this->gracefulMaxExecutionDateTime
88 8
                && $waitTimeout < 1
89 6
            ) {
90 6
                return $this->gracefulMaxExecutionTimeoutExitCode;
91 2
            }
92
93
            if (!$this->forceStop) {
94 4
                try {
95 4
                    $this->getChannel()->wait(null, false, $waitTimeout);
96
                    $this->setLastActivityDateTime(new \DateTime());
97 4
                } catch (AMQPTimeoutException $e) {
98 4
                    $now = time();
99 2
100
                    if ($this->gracefulMaxExecutionDateTime
101 2
                        && $this->gracefulMaxExecutionDateTime <= new \DateTime("@$now")
102
                    ) {
103
                        return $this->gracefulMaxExecutionTimeoutExitCode;
104
                    } elseif ($this->getIdleTimeout()
105
                        && ($this->getLastActivityDateTime()->getTimestamp() + $this->getIdleTimeout() <= $now)
106
                    ) {
107
                        $idleEvent = new OnIdleEvent($this);
108 4
                        $this->dispatchEvent(OnIdleEvent::NAME, $idleEvent);
109
110
                        if ($idleEvent->isForceStop()) {
111
                            if (null !== $this->getIdleTimeoutExitCode()) {
112
                                return $this->getIdleTimeoutExitCode();
113
                            } else {
114
                                throw $e;
115
                            }
116
                        }
117
                    }
118
                }
119
            }
120
        }
121
122
        return 0;
123
    }
124
125
    /**
126
     * Purge the queue
127 32
     */
128
    public function purge()
129 32
    {
130 32
        $this->getChannel()->queue_purge($this->queueOptions['name'], true);
131
    }
132
133 32
    /**
134 32
     * Delete the queue
135 32
     */
136 32
    public function delete()
137 32
    {
138
        $this->getChannel()->queue_delete($this->queueOptions['name'], true);
139 32
    }
140
141 32
    protected function processMessageQueueCallback(AMQPMessage $msg, $queueName, $callback)
142 32
    {
143 32
        $this->dispatchEvent(BeforeProcessingMessageEvent::NAME,
144
            new BeforeProcessingMessageEvent($this, $msg)
145
        );
146
        try {
147
            $processFlag = call_user_func($callback, $msg);
148
            $this->handleProcessMessage($msg, $processFlag);
149
            $this->dispatchEvent(
150
                AfterProcessingMessageEvent::NAME,
151
                new AfterProcessingMessageEvent($this, $msg)
152
            );
153
            $this->logger->debug('Queue message processed', array(
154
                'amqp' => array(
155
                    'queue' => $queueName,
156
                    'message' => $msg,
157
                    'return_code' => $processFlag
158
                )
159
            ));
160
        } catch (Exception\StopConsumerException $e) {
161
            $this->logger->info('Consumer requested restart', array(
162
                'amqp' => array(
163
                    'queue' => $queueName,
164
                    'message' => $msg,
165
                    'stacktrace' => $e->getTraceAsString()
166
                )
167
            ));
168
            $this->handleProcessMessage($msg, $e->getHandleCode());
169
            $this->stopConsuming();
170
        } catch (\Exception $e) {
171
            $this->logger->error($e->getMessage(), array(
172
                'amqp' => array(
173
                    'queue' => $queueName,
174
                    'message' => $msg,
175 32
                    'stacktrace' => $e->getTraceAsString()
176
                )
177 14
            ));
178
            throw $e;
179 14
        } catch (\Error $e) {
180 14
            $this->logger->error($e->getMessage(), array(
181
                'amqp' => array(
182 32
                    'queue' => $queueName,
183
                    'message' => $msg,
184 32
                    'stacktrace' => $e->getTraceAsString()
185
                )
186 10
            ));
187 22
            throw $e;
188
        }
189
    }
190 22
191
    public function processMessage(AMQPMessage $msg)
192 5
    {
193 17
        $this->processMessageQueueCallback($msg, $this->queueOptions['name'], $this->callback);
194
    }
195 15
196
    protected function handleProcessMessage(AMQPMessage $msg, $processFlag)
197
    {
198 32
        if ($processFlag === ConsumerInterface::MSG_REJECT_REQUEUE || false === $processFlag) {
199 32
            // Reject and requeue message to RabbitMQ
200
            $msg->delivery_info['channel']->basic_reject($msg->delivery_info['delivery_tag'], true);
0 ignored issues
show
Deprecated Code introduced by
The property PhpAmqpLib\Message\AMQPMessage::$delivery_info has been deprecated. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

200
            /** @scrutinizer ignore-deprecated */ $msg->delivery_info['channel']->basic_reject($msg->delivery_info['delivery_tag'], true);
Loading history...
201 32
        } else if ($processFlag === ConsumerInterface::MSG_SINGLE_NACK_REQUEUE) {
202
            // NACK and requeue message to RabbitMQ
203
            $msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag'], false, true);
0 ignored issues
show
Deprecated Code introduced by
The property PhpAmqpLib\Message\AMQPMessage::$delivery_info has been deprecated. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

203
            /** @scrutinizer ignore-deprecated */ $msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag'], false, true);
Loading history...
204 32
        } else if ($processFlag === ConsumerInterface::MSG_REJECT) {
205
            // Reject and drop
206
            $msg->delivery_info['channel']->basic_reject($msg->delivery_info['delivery_tag'], false);
0 ignored issues
show
Deprecated Code introduced by
The property PhpAmqpLib\Message\AMQPMessage::$delivery_info has been deprecated. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

206
            /** @scrutinizer ignore-deprecated */ $msg->delivery_info['channel']->basic_reject($msg->delivery_info['delivery_tag'], false);
Loading history...
207
        } else if ($processFlag !== ConsumerInterface::MSG_ACK_SENT) {
208
            // Remove message from queue only if callback return not false
209
            $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
0 ignored issues
show
Deprecated Code introduced by
The property PhpAmqpLib\Message\AMQPMessage::$delivery_info has been deprecated. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

209
            $msg->delivery_info['channel']->basic_ack(/** @scrutinizer ignore-deprecated */ $msg->delivery_info['delivery_tag']);
Loading history...
210
        }
211
212
        $this->consumed++;
213
        $this->maybeStopConsumer();
214
215
        if (!is_null($this->getMemoryLimit()) && $this->isRamAlmostOverloaded()) {
216
            $this->stopConsuming();
217
        }
218
    }
219
220
    /**
221 4
     * Checks if memory in use is greater or equal than memory allowed for this process
222
     *
223 4
     * @return boolean
224 4
     */
225
    protected function isRamAlmostOverloaded()
226
    {
227
        $memoryManager = new MemoryConsumptionChecker(new NativeMemoryUsageProvider());
228
229 4
        return $memoryManager->isRamAlmostOverloaded($this->getMemoryLimit().'M', '5M');
230
    }
231 4
232 4
    /**
233
     * @param \DateTime|null $dateTime
234
     */
235
    public function setGracefulMaxExecutionDateTime(\DateTime $dateTime = null)
236
    {
237 2
        $this->gracefulMaxExecutionDateTime = $dateTime;
238
    }
239 2
240 2
    /**
241
     * @param int $secondsInTheFuture
242
     */
243
    public function setGracefulMaxExecutionDateTimeFromSecondsInTheFuture($secondsInTheFuture)
244
    {
245
        $this->setGracefulMaxExecutionDateTime(new \DateTime("+{$secondsInTheFuture} seconds"));
246
    }
247
248
    /**
249
     * @param int $exitCode
250
     */
251
    public function setGracefulMaxExecutionTimeoutExitCode($exitCode)
252
    {
253
        $this->gracefulMaxExecutionTimeoutExitCode = $exitCode;
254
    }
255
256
    public function setTimeoutWait(int $timeoutWait): void
257
    {
258
        $this->timeoutWait = $timeoutWait;
259
    }
260
261
    /**
262
     * @return \DateTime|null
263
     */
264
    public function getGracefulMaxExecutionDateTime()
265
    {
266
        return $this->gracefulMaxExecutionDateTime;
267 10
    }
268
269 10
    /**
270 4
     * @return int
271 4
     */
272 4
    public function getGracefulMaxExecutionTimeoutExitCode()
273 4
    {
274 4
        return $this->gracefulMaxExecutionTimeoutExitCode;
275
    }
276 4
277 2
    public function getTimeoutWait(): ?int
278
    {
279
        return $this->timeoutWait;
280
    }
281
282
    /**
283
     * Choose the timeout wait (in seconds) to use for the $this->getChannel()->wait() method.
284
     */
285 4
    private function chooseWaitTimeout(): int
286 4
    {
287
        if ($this->gracefulMaxExecutionDateTime) {
288
            $allowedExecutionDateInterval = $this->gracefulMaxExecutionDateTime->diff(new \DateTime());
289
            $allowedExecutionSeconds = $allowedExecutionDateInterval->days * 86400
290
                + $allowedExecutionDateInterval->h * 3600
291
                + $allowedExecutionDateInterval->i * 60
292
                + $allowedExecutionDateInterval->s;
293
294
            if (!$allowedExecutionDateInterval->invert) {
295 4
                $allowedExecutionSeconds *= -1;
296 4
            }
297
298
            /*
299
             * Respect the idle timeout if it's set and if it's less than
300
             * the remaining allowed execution.
301 6
             */
302 6
            if ($this->getIdleTimeout()
303
                && $this->getIdleTimeout() < $allowedExecutionSeconds
304
            ) {
305
                $waitTimeout = $this->getIdleTimeout();
306
            } else {
307
                $waitTimeout = $allowedExecutionSeconds;
308
            }
309
        } else {
310
            $waitTimeout = $this->getIdleTimeout();
311
        }
312
313
        if (!is_null($this->getTimeoutWait()) && $this->getTimeoutWait() > 0) {
314
            $waitTimeout = min($waitTimeout, $this->getTimeoutWait());
315
        }
316
        return $waitTimeout;
317
    }
318
319
    public function setLastActivityDateTime(\DateTime $dateTime)
320
    {
321
        $this->lastActivityDateTime = $dateTime;
322
    }
323
324
    protected function getLastActivityDateTime(): ?\DateTime
325
    {
326
        return $this->lastActivityDateTime;
327
    }
328
}
329