Passed
Push — master ( 66d6c5...fa391e )
by Mihai
18:37 queued 16:26
created

Consumer::setLastActivityDateTime()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 1
c 0
b 0
f 0
nc 1
nop 1
dl 0
loc 3
ccs 2
cts 2
cp 1
crap 1
rs 10
1
<?php
2
3
namespace OldSound\RabbitMqBundle\RabbitMq;
4
5
use OldSound\RabbitMqBundle\Event\AfterProcessingMessageEvent;
6
use OldSound\RabbitMqBundle\Event\BeforeProcessingMessageEvent;
7
use OldSound\RabbitMqBundle\Event\OnConsumeEvent;
8
use OldSound\RabbitMqBundle\Event\OnIdleEvent;
9
use OldSound\RabbitMqBundle\MemoryChecker\MemoryConsumptionChecker;
10
use OldSound\RabbitMqBundle\MemoryChecker\NativeMemoryUsageProvider;
11
use PhpAmqpLib\Exception\AMQPTimeoutException;
12
use PhpAmqpLib\Message\AMQPMessage;
13
14
class Consumer extends BaseConsumer
15
{
16
    /**
17
     * @var int|null $memoryLimit
18
     */
19
    protected $memoryLimit = null;
20
21
    /**
22
     * @var \DateTime|null DateTime after which the consumer will gracefully exit. "Gracefully" means, that
23
     *      any currently running consumption will not be interrupted.
24
     */
25
    protected $gracefulMaxExecutionDateTime;
26
27
    /**
28
     * @var int Exit code used, when consumer is closed by the Graceful Max Execution Timeout feature.
29
     */
30
    protected $gracefulMaxExecutionTimeoutExitCode = 0;
31
32
    /**
33
     * @var int|null
34
     */
35
    protected $timeoutWait;
36
37
    /**
38
     * @var \DateTime|null
39
     */
40
    protected $lastActivityDateTime;
41
42
    /**
43
     * Set the memory limit
44
     *
45
     * @param int $memoryLimit
46
     */
47
    public function setMemoryLimit($memoryLimit)
48
    {
49
        $this->memoryLimit = $memoryLimit;
50
    }
51
52
    /**
53
     * Get the memory limit
54
     *
55
     * @return int|null
56
     */
57 32
    public function getMemoryLimit()
58
    {
59 32
        return $this->memoryLimit;
60
    }
61
62
    /**
63
     * Consume the message
64
     *
65
     * @param   int     $msgAmount
66
     *
67
     * @return  int
68
     *
69
     * @throws  AMQPTimeoutException
70
     */
71 18
    public function consume($msgAmount)
72
    {
73 18
        $this->target = $msgAmount;
74
75 18
        $this->setupConsumer();
76
77 18
        $this->setLastActivityDateTime(new \DateTime());
78 18
        while ($this->getChannel()->is_consuming()) {
79 18
            $this->dispatchEvent(OnConsumeEvent::NAME, new OnConsumeEvent($this));
80 18
            $this->maybeStopConsumer();
81
82
            /*
83
             * Be careful not to trigger ::wait() with 0 or less seconds, when
84
             * graceful max execution timeout is being used.
85
             */
86 18
            $waitTimeout = $this->chooseWaitTimeout();
87 18
            if ($this->gracefulMaxExecutionDateTime
88 18
                && $waitTimeout < 1
89
            ) {
90 4
                return $this->gracefulMaxExecutionTimeoutExitCode;
91
            }
92
93 16
            if (!$this->forceStop) {
94
                try {
95 16
                    $this->getChannel()->wait(null, false, $waitTimeout);
96 4
                    $this->setLastActivityDateTime(new \DateTime());
97 12
                } catch (AMQPTimeoutException $e) {
98 12
                    $now = time();
99
100 12
                    if ($this->gracefulMaxExecutionDateTime
101 12
                        && $this->gracefulMaxExecutionDateTime <= new \DateTime("@$now")
102
                    ) {
103 4
                        return $this->gracefulMaxExecutionTimeoutExitCode;
104 8
                    } elseif ($this->getIdleTimeout()
105 8
                        && ($this->getLastActivityDateTime()->getTimestamp() + $this->getIdleTimeout() <= $now)
106
                    ) {
107 6
                        $idleEvent = new OnIdleEvent($this);
108 6
                        $this->dispatchEvent(OnIdleEvent::NAME, $idleEvent);
109
110 6
                        if ($idleEvent->isForceStop()) {
111 6
                            if (null !== $this->getIdleTimeoutExitCode()) {
112 4
                                return $this->getIdleTimeoutExitCode();
113
                            } else {
114 2
                                throw $e;
115
                            }
116
                        }
117
                    }
118
                }
119
            }
120
        }
121
122 4
        return 0;
123
    }
124
125
    /**
126
     * Purge the queue
127
     */
128
    public function purge()
129
    {
130
        $this->getChannel()->queue_purge($this->queueOptions['name'], true);
131
    }
132
133
    /**
134
     * Delete the queue
135
     */
136
    public function delete()
137
    {
138
        $this->getChannel()->queue_delete($this->queueOptions['name'], true);
139
    }
140
141 32
    protected function processMessageQueueCallback(AMQPMessage $msg, $queueName, $callback)
142
    {
143 32
        $this->dispatchEvent(
144 32
            BeforeProcessingMessageEvent::NAME,
145 32
            new BeforeProcessingMessageEvent($this, $msg)
146
        );
147
        try {
148 32
            $processFlag = call_user_func($callback, $msg);
149 32
            $this->handleProcessMessage($msg, $processFlag);
150 32
            $this->dispatchEvent(
151 32
                AfterProcessingMessageEvent::NAME,
152 32
                new AfterProcessingMessageEvent($this, $msg)
153
            );
154 32
            $this->logger->debug('Queue message processed', array(
155
                'amqp' => array(
156 32
                    'queue' => $queueName,
157 32
                    'message' => $msg,
158 32
                    'return_code' => $processFlag
159
                )
160
            ));
161
        } catch (Exception\StopConsumerException $e) {
162
            $this->logger->info('Consumer requested stop', array(
163
                'amqp' => array(
164
                    'queue' => $queueName,
165
                    'message' => $msg,
166
                    'stacktrace' => $e->getTraceAsString()
167
                )
168
            ));
169
            $this->handleProcessMessage($msg, $e->getHandleCode());
170
            $this->stopConsuming();
171
        } catch (\Exception $e) {
172
            $this->logger->error($e->getMessage(), array(
173
                'amqp' => array(
174
                    'queue' => $queueName,
175
                    'message' => $msg,
176
                    'stacktrace' => $e->getTraceAsString()
177
                )
178
            ));
179
            throw $e;
180
        } catch (\Error $e) {
181
            $this->logger->error($e->getMessage(), array(
182
                'amqp' => array(
183
                    'queue' => $queueName,
184
                    'message' => $msg,
185
                    'stacktrace' => $e->getTraceAsString()
186
                )
187
            ));
188
            throw $e;
189
        }
190 32
    }
191
192 14
    public function processMessage(AMQPMessage $msg)
193
    {
194 14
        $this->processMessageQueueCallback($msg, $this->queueOptions['name'], $this->callback);
195 14
    }
196
197 32
    protected function handleProcessMessage(AMQPMessage $msg, $processFlag)
198
    {
199 32
        if ($processFlag === ConsumerInterface::MSG_REJECT_REQUEUE || false === $processFlag) {
200
            // Reject and requeue message to RabbitMQ
201 10
            $msg->reject();
202 22
        } elseif ($processFlag === ConsumerInterface::MSG_SINGLE_NACK_REQUEUE) {
203
            // NACK and requeue message to RabbitMQ
204
            $msg->nack(true);
205 22
        } elseif ($processFlag === ConsumerInterface::MSG_REJECT) {
206
            // Reject and drop
207 5
            $msg->reject(false);
208 17
        } elseif ($processFlag !== ConsumerInterface::MSG_ACK_SENT) {
209
            // Remove message from queue only if callback return not false
210 15
            $msg->ack();
211
        }
212
213 32
        $this->consumed++;
214 32
        $this->maybeStopConsumer();
215
216 32
        if (!is_null($this->getMemoryLimit()) && $this->isRamAlmostOverloaded()) {
217
            $this->stopConsuming();
218
        }
219 32
    }
220
221
    /**
222
     * Checks if memory in use is greater or equal than memory allowed for this process
223
     *
224
     * @return boolean
225
     */
226
    protected function isRamAlmostOverloaded()
227
    {
228
        $memoryManager = new MemoryConsumptionChecker(new NativeMemoryUsageProvider());
229
230
        return $memoryManager->isRamAlmostOverloaded($this->getMemoryLimit().'M', '5M');
231
    }
232
233
    /**
234
     * @param \DateTime|null $dateTime
235
     */
236 8
    public function setGracefulMaxExecutionDateTime(\DateTime $dateTime = null)
237
    {
238 8
        $this->gracefulMaxExecutionDateTime = $dateTime;
239 8
    }
240
241
    /**
242
     * @param int $secondsInTheFuture
243
     */
244 8
    public function setGracefulMaxExecutionDateTimeFromSecondsInTheFuture($secondsInTheFuture)
245
    {
246 8
        $this->setGracefulMaxExecutionDateTime(new \DateTime("+{$secondsInTheFuture} seconds"));
247 8
    }
248
249
    /**
250
     * @param int $exitCode
251
     */
252 2
    public function setGracefulMaxExecutionTimeoutExitCode($exitCode)
253
    {
254 2
        $this->gracefulMaxExecutionTimeoutExitCode = $exitCode;
255 2
    }
256
257 6
    public function setTimeoutWait(int $timeoutWait): void
258
    {
259 6
        $this->timeoutWait = $timeoutWait;
260 6
    }
261
262
    /**
263
     * @return \DateTime|null
264
     */
265 4
    public function getGracefulMaxExecutionDateTime()
266
    {
267 4
        return $this->gracefulMaxExecutionDateTime;
268
    }
269
270
    /**
271
     * @return int
272
     */
273
    public function getGracefulMaxExecutionTimeoutExitCode()
274
    {
275
        return $this->gracefulMaxExecutionTimeoutExitCode;
276
    }
277
278 18
    public function getTimeoutWait(): ?int
279
    {
280 18
        return $this->timeoutWait;
281
    }
282
283
    /**
284
     * Choose the timeout wait (in seconds) to use for the $this->getChannel()->wait() method.
285
     */
286 18
    private function chooseWaitTimeout(): int
287
    {
288 18
        if ($this->gracefulMaxExecutionDateTime) {
289 8
            $allowedExecutionDateInterval = $this->gracefulMaxExecutionDateTime->diff(new \DateTime());
290 8
            $allowedExecutionSeconds = $allowedExecutionDateInterval->days * 86400
291 8
                + $allowedExecutionDateInterval->h * 3600
292 8
                + $allowedExecutionDateInterval->i * 60
293 8
                + $allowedExecutionDateInterval->s;
294
295 8
            if (!$allowedExecutionDateInterval->invert) {
296 2
                $allowedExecutionSeconds *= -1;
297
            }
298
299
            /*
300
             * Respect the idle timeout if it's set and if it's less than
301
             * the remaining allowed execution.
302
             */
303 8
            if ($this->getIdleTimeout()
304 8
                && $this->getIdleTimeout() < $allowedExecutionSeconds
305
            ) {
306 2
                $waitTimeout = $this->getIdleTimeout();
307
            } else {
308 8
                $waitTimeout = $allowedExecutionSeconds;
309
            }
310
        } else {
311 10
            $waitTimeout = $this->getIdleTimeout();
312
        }
313
314 18
        if (!is_null($this->getTimeoutWait()) && $this->getTimeoutWait() > 0) {
315 6
            $waitTimeout = min($waitTimeout, $this->getTimeoutWait());
316
        }
317 18
        return $waitTimeout;
318
    }
319
320 18
    public function setLastActivityDateTime(\DateTime $dateTime)
321
    {
322 18
        $this->lastActivityDateTime = $dateTime;
323 18
    }
324
325 8
    protected function getLastActivityDateTime(): ?\DateTime
326
    {
327 8
        return $this->lastActivityDateTime;
328
    }
329
}
330