Passed
Push — master ( 5a8d35...554209 )
by Mihai
07:44
created

Consumer::getGracefulMaxExecutionDateTime()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
cc 1
eloc 1
nc 1
nop 0
dl 0
loc 3
ccs 0
cts 0
cp 0
crap 2
rs 10
c 0
b 0
f 0
1
<?php
2
3
namespace OldSound\RabbitMqBundle\RabbitMq;
4
5
use OldSound\RabbitMqBundle\Event\AfterProcessingMessageEvent;
6
use OldSound\RabbitMqBundle\Event\BeforeProcessingMessageEvent;
7
use OldSound\RabbitMqBundle\Event\OnConsumeEvent;
8
use OldSound\RabbitMqBundle\Event\OnIdleEvent;
9
use OldSound\RabbitMqBundle\MemoryChecker\MemoryConsumptionChecker;
10
use OldSound\RabbitMqBundle\MemoryChecker\NativeMemoryUsageProvider;
11
use PhpAmqpLib\Exception\AMQPTimeoutException;
12
use PhpAmqpLib\Message\AMQPMessage;
13
14
class Consumer extends BaseConsumer
15
{
16
    /**
17
     * @var int|null $memoryLimit
18
     */
19
    protected $memoryLimit = null;
20
21
    /**
22
     * @var \DateTime|null DateTime after which the consumer will gracefully exit. "Gracefully" means, that
23
     *      any currently running consumption will not be interrupted.
24
     */
25
    protected $gracefulMaxExecutionDateTime;
26
27
    /**
28
     * @var int Exit code used, when consumer is closed by the Graceful Max Execution Timeout feature.
29
     */
30
    protected $gracefulMaxExecutionTimeoutExitCode = 0;
31
32
    /**
33
     * @var int|null
34
     */
35
    protected $timeoutWait;
36
37
    /**
38
     * @var \DateTime|null
39
     */
40
    protected $lastActivityDateTime;
41
42
    /**
43
     * Set the memory limit
44
     *
45
     * @param int $memoryLimit
46
     */
47
    public function setMemoryLimit($memoryLimit)
48
    {
49
        $this->memoryLimit = $memoryLimit;
50 32
    }
51
52 32
    /**
53
     * Get the memory limit
54
     *
55
     * @return int|null
56
     */
57
    public function getMemoryLimit()
58
    {
59
        return $this->memoryLimit;
60
    }
61
62
    /**
63
     * Consume the message
64 12
     *
65
     * @param   int     $msgAmount
66 12
     *
67
     * @return  int
68 12
     *
69
     * @throws  AMQPTimeoutException
70 12
     */
71 10
    public function consume($msgAmount)
72 10
    {
73
        $this->target = $msgAmount;
74
75
        $this->setupConsumer();
76
77
        $this->setLastActivityDateTime(new \DateTime());
78 10
        while (count($this->getChannel()->callbacks)) {
79
            $this->dispatchEvent(OnConsumeEvent::NAME, new OnConsumeEvent($this));
80 10
            $this->maybeStopConsumer();
81 10
82
            /*
83 2
             * Be careful not to trigger ::wait() with 0 or less seconds, when
84
             * graceful max execution timeout is being used.
85
             */
86 8
            $waitTimeout = $this->chooseWaitTimeout();
87
            if ($this->gracefulMaxExecutionDateTime
88 8
                && $waitTimeout < 1
89 6
            ) {
90 6
                return $this->gracefulMaxExecutionTimeoutExitCode;
91 2
            }
92
93
            if (!$this->forceStop) {
94 4
                try {
95 4
                    $this->getChannel()->wait(null, false, $waitTimeout);
96
                    $this->setLastActivityDateTime(new \DateTime());
97 4
                } catch (AMQPTimeoutException $e) {
98 4
                    $now = time();
99 2
100
                    if ($this->gracefulMaxExecutionDateTime
101 2
                        && $this->gracefulMaxExecutionDateTime <= new \DateTime("@$now")
102
                    ) {
103
                        return $this->gracefulMaxExecutionTimeoutExitCode;
104
                    } elseif ($this->getIdleTimeout()
105
                        && ($this->getLastActivityDateTime()->getTimestamp() + $this->getIdleTimeout() <= $now)
106
                    ) {
107
                        $idleEvent = new OnIdleEvent($this);
108 4
                        $this->dispatchEvent(OnIdleEvent::NAME, $idleEvent);
109
110
                        if ($idleEvent->isForceStop()) {
111
                            if (null !== $this->getIdleTimeoutExitCode()) {
112
                                return $this->getIdleTimeoutExitCode();
113
                            } else {
114
                                throw $e;
115
                            }
116
                        }
117
                    }
118
                }
119
            }
120
        }
121
122
        return 0;
123
    }
124
125
    /**
126
     * Purge the queue
127 32
     */
128
    public function purge()
129 32
    {
130 32
        $this->getChannel()->queue_purge($this->queueOptions['name'], true);
131
    }
132
133 32
    /**
134 32
     * Delete the queue
135 32
     */
136 32
    public function delete()
137 32
    {
138
        $this->getChannel()->queue_delete($this->queueOptions['name'], true);
139 32
    }
140
141 32
    protected function processMessageQueueCallback(AMQPMessage $msg, $queueName, $callback)
142 32
    {
143 32
        $this->dispatchEvent(BeforeProcessingMessageEvent::NAME,
144
            new BeforeProcessingMessageEvent($this, $msg)
145
        );
146
        try {
147
            $processFlag = call_user_func($callback, $msg);
148
            $this->handleProcessMessage($msg, $processFlag);
149
            $this->dispatchEvent(
150
                AfterProcessingMessageEvent::NAME,
151
                new AfterProcessingMessageEvent($this, $msg)
152
            );
153
            $this->logger->debug('Queue message processed', array(
154
                'amqp' => array(
155
                    'queue' => $queueName,
156
                    'message' => $msg,
157
                    'return_code' => $processFlag
158
                )
159
            ));
160
        } catch (Exception\StopConsumerException $e) {
161
            $this->logger->info('Consumer requested restart', array(
162
                'amqp' => array(
163
                    'queue' => $queueName,
164
                    'message' => $msg,
165
                    'stacktrace' => $e->getTraceAsString()
166
                )
167
            ));
168
            $this->handleProcessMessage($msg, $e->getHandleCode());
169
            $this->stopConsuming();
170
        } catch (\Exception $e) {
171
            $this->logger->error($e->getMessage(), array(
172
                'amqp' => array(
173
                    'queue' => $queueName,
174
                    'message' => $msg,
175 32
                    'stacktrace' => $e->getTraceAsString()
176
                )
177 14
            ));
178
            throw $e;
179 14
        } catch (\Error $e) {
180 14
            $this->logger->error($e->getMessage(), array(
181
                'amqp' => array(
182 32
                    'queue' => $queueName,
183
                    'message' => $msg,
184 32
                    'stacktrace' => $e->getTraceAsString()
185
                )
186 10
            ));
187 22
            throw $e;
188
        }
189
    }
190 22
191
    public function processMessage(AMQPMessage $msg)
192 5
    {
193 17
        $this->processMessageQueueCallback($msg, $this->queueOptions['name'], $this->callback);
194
    }
195 15
196
    protected function handleProcessMessage(AMQPMessage $msg, $processFlag)
197
    {
198 32
        if ($processFlag === ConsumerInterface::MSG_REJECT_REQUEUE || false === $processFlag) {
199 32
            // Reject and requeue message to RabbitMQ
200
            $msg->delivery_info['channel']->basic_reject($msg->delivery_info['delivery_tag'], true);
0 ignored issues
show
Deprecated Code introduced by
The property PhpAmqpLib\Message\AMQPMessage::$delivery_info has been deprecated. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

200
            /** @scrutinizer ignore-deprecated */ $msg->delivery_info['channel']->basic_reject($msg->delivery_info['delivery_tag'], true);
Loading history...
201 32
        } else if ($processFlag === ConsumerInterface::MSG_SINGLE_NACK_REQUEUE) {
202
            // NACK and requeue message to RabbitMQ
203
            $msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag'], false, true);
0 ignored issues
show
Deprecated Code introduced by
The property PhpAmqpLib\Message\AMQPMessage::$delivery_info has been deprecated. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

203
            /** @scrutinizer ignore-deprecated */ $msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag'], false, true);
Loading history...
204 32
        } else if ($processFlag === ConsumerInterface::MSG_REJECT) {
205
            // Reject and drop
206
            $msg->delivery_info['channel']->basic_reject($msg->delivery_info['delivery_tag'], false);
0 ignored issues
show
Deprecated Code introduced by
The property PhpAmqpLib\Message\AMQPMessage::$delivery_info has been deprecated. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

206
            /** @scrutinizer ignore-deprecated */ $msg->delivery_info['channel']->basic_reject($msg->delivery_info['delivery_tag'], false);
Loading history...
207
        } else if ($processFlag !== ConsumerInterface::MSG_ACK_SENT) {
208
            // Remove message from queue only if callback return not false
209
            $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
0 ignored issues
show
Deprecated Code introduced by
The property PhpAmqpLib\Message\AMQPMessage::$delivery_info has been deprecated. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

209
            $msg->delivery_info['channel']->basic_ack(/** @scrutinizer ignore-deprecated */ $msg->delivery_info['delivery_tag']);
Loading history...
210
        }
211
212
        $this->consumed++;
213
        $this->maybeStopConsumer();
214
215
        if (!is_null($this->getMemoryLimit()) && $this->isRamAlmostOverloaded()) {
216
            $this->stopConsuming();
217
        }
218
    }
219
220
    /**
221 4
     * Checks if memory in use is greater or equal than memory allowed for this process
222
     *
223 4
     * @return boolean
224 4
     */
225
    protected function isRamAlmostOverloaded()
226
    {
227
        $memoryManager = new MemoryConsumptionChecker(new NativeMemoryUsageProvider());
228
229 4
        return $memoryManager->isRamAlmostOverloaded($this->getMemoryLimit().'M', '5M');
230
    }
231 4
232 4
    /**
233
     * @param \DateTime|null $dateTime
234
     */
235
    public function setGracefulMaxExecutionDateTime(\DateTime $dateTime = null)
236
    {
237 2
        $this->gracefulMaxExecutionDateTime = $dateTime;
238
    }
239 2
240 2
    /**
241
     * @param int $secondsInTheFuture
242
     */
243
    public function setGracefulMaxExecutionDateTimeFromSecondsInTheFuture($secondsInTheFuture)
244
    {
245
        $this->setGracefulMaxExecutionDateTime(new \DateTime("+{$secondsInTheFuture} seconds"));
246
    }
247
248
    /**
249
     * @param int $exitCode
250
     */
251
    public function setGracefulMaxExecutionTimeoutExitCode($exitCode)
252
    {
253
        $this->gracefulMaxExecutionTimeoutExitCode = $exitCode;
254
    }
255
256
    public function setTimeoutWait(int $timeoutWait): void
257
    {
258
        $this->timeoutWait = $timeoutWait;
259
    }
260
261
    /**
262
     * @return \DateTime|null
263
     */
264
    public function getGracefulMaxExecutionDateTime()
265
    {
266
        return $this->gracefulMaxExecutionDateTime;
267 10
    }
268
269 10
    /**
270 4
     * @return int
271 4
     */
272 4
    public function getGracefulMaxExecutionTimeoutExitCode()
273 4
    {
274 4
        return $this->gracefulMaxExecutionTimeoutExitCode;
275
    }
276 4
277 2
    public function getTimeoutWait(): ?int
278
    {
279
        return $this->timeoutWait;
280
    }
281
282
    /**
283
     * Choose the timeout wait (in seconds) to use for the $this->getChannel()->wait() method.
284
     */
285 4
    private function chooseWaitTimeout(): int
286 4
    {
287
        if ($this->gracefulMaxExecutionDateTime) {
288
            $allowedExecutionDateInterval = $this->gracefulMaxExecutionDateTime->diff(new \DateTime());
289
            $allowedExecutionSeconds = $allowedExecutionDateInterval->days * 86400
290
                + $allowedExecutionDateInterval->h * 3600
291
                + $allowedExecutionDateInterval->i * 60
292
                + $allowedExecutionDateInterval->s;
293
294
            if (!$allowedExecutionDateInterval->invert) {
295 4
                $allowedExecutionSeconds *= -1;
296 4
            }
297
298
            /*
299
             * Respect the idle timeout if it's set and if it's less than
300
             * the remaining allowed execution.
301 6
             */
302 6
            if ($this->getIdleTimeout()
303
                && $this->getIdleTimeout() < $allowedExecutionSeconds
304
            ) {
305
                $waitTimeout = $this->getIdleTimeout();
306
            } else {
307
                $waitTimeout = $allowedExecutionSeconds;
308
            }
309
        } else {
310
            $waitTimeout = $this->getIdleTimeout();
311
        }
312
313
        if (!is_null($this->getTimeoutWait()) && $this->getTimeoutWait() > 0) {
314
            $waitTimeout = min($waitTimeout, $this->getTimeoutWait());
315
        }
316
        return $waitTimeout;
317
    }
318
319
    public function setLastActivityDateTime(\DateTime $dateTime)
320
    {
321
        $this->lastActivityDateTime = $dateTime;
322
    }
323
324
    protected function getLastActivityDateTime(): ?\DateTime
325
    {
326
        return $this->lastActivityDateTime;
327
    }
328
}
329