Completed
Pull Request — master (#36)
by
unknown
13:35
created

Consumer::setGracefulMaxExecutionDateTime()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 1
c 0
b 0
f 0
nc 1
nop 1
dl 0
loc 3
ccs 1
cts 1
cp 1
crap 1
rs 10
1
<?php
2
3
namespace OldSound\RabbitMqBundle\RabbitMq;
4
5
use OldSound\RabbitMqBundle\Event\AfterProcessingMessageEvent;
6
use OldSound\RabbitMqBundle\Event\BeforeProcessingMessageEvent;
7
use OldSound\RabbitMqBundle\Event\OnConsumeEvent;
8
use OldSound\RabbitMqBundle\Event\OnIdleEvent;
9
use OldSound\RabbitMqBundle\MemoryChecker\MemoryConsumptionChecker;
10
use OldSound\RabbitMqBundle\MemoryChecker\NativeMemoryUsageProvider;
11
use PhpAmqpLib\Exception\AMQPTimeoutException;
12
use PhpAmqpLib\Message\AMQPMessage;
13
14
class Consumer extends BaseConsumer
15
{
16
    /**
17
     * @var int|null $memoryLimit
18
     */
19
    protected $memoryLimit = null;
20
21
    /**
22
     * @var \DateTime|null DateTime after which the consumer will gracefully exit. "Gracefully" means, that
23
     *      any currently running consumption will not be interrupted.
24
     */
25
    protected $gracefulMaxExecutionDateTime;
26
27
    /**
28
     * @var int Exit code used, when consumer is closed by the Graceful Max Execution Timeout feature.
29
     */
30
    protected $gracefulMaxExecutionTimeoutExitCode = 0;
31
32
    /**
33
     * @var int|null
34
     */
35
    protected $timeoutWait;
36
37
    /**
38
     * @var \DateTime|null
39
     */
40
    protected $lastActivityDateTime;
41
42
    /**
43
     * Set the memory limit
44
     *
45
     * @param int $memoryLimit
46
     */
47
    public function setMemoryLimit($memoryLimit)
48
    {
49
        $this->memoryLimit = $memoryLimit;
50 32
    }
51
52 32
    /**
53
     * Get the memory limit
54
     *
55
     * @return int|null
56
     */
57
    public function getMemoryLimit()
58
    {
59
        return $this->memoryLimit;
60
    }
61
62
    /**
63
     * Consume the message
64 12
     *
65
     * @param   int     $msgAmount
66 12
     *
67
     * @return  int
68 12
     *
69
     * @throws  AMQPTimeoutException
70 12
     */
71 10
    public function consume($msgAmount)
72 10
    {
73
        $this->target = $msgAmount;
74
75
        $this->setupConsumer();
76
77
        $this->setLastActivityDateTime(new \DateTime());
78 10
        while (count($this->getChannel()->callbacks)) {
79
            $this->dispatchEvent(OnConsumeEvent::NAME, new OnConsumeEvent($this));
80 10
            $this->maybeStopConsumer();
81 10
82
            /*
83 2
             * Be careful not to trigger ::wait() with 0 or less seconds, when
84
             * graceful max execution timeout is being used.
85
             */
86 8
            $waitTimeout = $this->chooseWaitTimeout();
87
            if ($this->gracefulMaxExecutionDateTime
88 8
                && $waitTimeout < 1
89 6
            ) {
90 6
                return $this->gracefulMaxExecutionTimeoutExitCode;
91 2
            }
92
93
            if (!$this->forceStop) {
94 4
                try {
95 4
                    $this->getChannel()->wait(null, false, $waitTimeout);
96
                    $this->setLastActivityDateTime(new \DateTime());
97 4
                } catch (AMQPTimeoutException $e) {
98 4
                    $now = time();
99 2
100
                    if ($this->gracefulMaxExecutionDateTime
101 2
                        && $this->gracefulMaxExecutionDateTime <= new \DateTime("@$now")
102
                    ) {
103
                        return $this->gracefulMaxExecutionTimeoutExitCode;
104
                    } elseif ($this->getIdleTimeout()
105
                        && ($this->getLastActivityDateTime()->getTimestamp() + $this->getIdleTimeout() <= $now)
106
                    ) {
107
                        $idleEvent = new OnIdleEvent($this);
108 4
                        $this->dispatchEvent(OnIdleEvent::NAME, $idleEvent);
109
110
                        if ($idleEvent->isForceStop()) {
111
                            if (null !== $this->getIdleTimeoutExitCode()) {
112
                                return $this->getIdleTimeoutExitCode();
113
                            } else {
114
                                throw $e;
115
                            }
116
                        }
117
                    }
118
                }
119
            }
120
        }
121
122
        return 0;
123
    }
124
125
    /**
126
     * Purge the queue
127 32
     */
128
    public function purge()
129 32
    {
130 32
        $this->getChannel()->queue_purge($this->queueOptions['name'], true);
131
    }
132
133 32
    /**
134 32
     * Delete the queue
135 32
     */
136 32
    public function delete()
137 32
    {
138
        $this->getChannel()->queue_delete($this->queueOptions['name'], true);
139 32
    }
140
141 32
    protected function processMessageQueueCallback(AMQPMessage $msg, $queueName, $callback)
142 32
    {
143 32
        $this->dispatchEvent(BeforeProcessingMessageEvent::NAME,
144
            new BeforeProcessingMessageEvent($this, $msg)
145
        );
146
        try {
147
            $processFlag = call_user_func($callback, $msg);
148
            $this->handleProcessMessage($msg, $processFlag);
149
            $this->dispatchEvent(
150
                AfterProcessingMessageEvent::NAME,
151
                new AfterProcessingMessageEvent($this, $msg)
152
            );
153
            $this->logger->debug('Queue message processed', array(
154
                'amqp' => array(
155
                    'queue' => $queueName,
156
                    'message' => $msg,
157
                    'return_code' => $processFlag
158
                )
159
            ));
160
        } catch (Exception\StopConsumerException $e) {
161
            $this->logger->info('Consumer requested restart', array(
162
                'amqp' => array(
163
                    'queue' => $queueName,
164
                    'message' => $msg,
165
                    'stacktrace' => $e->getTraceAsString()
166
                )
167
            ));
168
            $this->handleProcessMessage($msg, $e->getHandleCode());
169
            $this->stopConsuming();
170
        } catch (\Exception $e) {
171
            $this->logger->error($e->getMessage(), array(
172
                'amqp' => array(
173
                    'queue' => $queueName,
174
                    'message' => $msg,
175 32
                    'stacktrace' => $e->getTraceAsString()
176
                )
177 14
            ));
178
            throw $e;
179 14
        } catch (\Error $e) {
180 14
            $this->logger->error($e->getMessage(), array(
181
                'amqp' => array(
182 32
                    'queue' => $queueName,
183
                    'message' => $msg,
184 32
                    'stacktrace' => $e->getTraceAsString()
185
                )
186 10
            ));
187 22
            throw $e;
188
        }
189
    }
190 22
191
    public function processMessage(AMQPMessage $msg)
192 5
    {
193 17
        $this->processMessageQueueCallback($msg, $this->queueOptions['name'], $this->callback);
194
    }
195 15
196
    protected function handleProcessMessage(AMQPMessage $msg, $processFlag)
197
    {
198 32
        if ($processFlag === ConsumerInterface::MSG_REJECT_REQUEUE || false === $processFlag) {
199 32
            // Reject and requeue message to RabbitMQ
200
            $msg->delivery_info['channel']->basic_reject($msg->delivery_info['delivery_tag'], true);
0 ignored issues
show
Deprecated Code introduced by
The property PhpAmqpLib\Message\AMQPMessage::$delivery_info has been deprecated. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

200
            /** @scrutinizer ignore-deprecated */ $msg->delivery_info['channel']->basic_reject($msg->delivery_info['delivery_tag'], true);
Loading history...
201 32
        } else if ($processFlag === ConsumerInterface::MSG_SINGLE_NACK_REQUEUE) {
202
            // NACK and requeue message to RabbitMQ
203
            $msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag'], false, true);
0 ignored issues
show
Deprecated Code introduced by
The property PhpAmqpLib\Message\AMQPMessage::$delivery_info has been deprecated. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

203
            /** @scrutinizer ignore-deprecated */ $msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag'], false, true);
Loading history...
204 32
        } else if ($processFlag === ConsumerInterface::MSG_REJECT) {
205
            // Reject and drop
206
            $msg->delivery_info['channel']->basic_reject($msg->delivery_info['delivery_tag'], false);
0 ignored issues
show
Deprecated Code introduced by
The property PhpAmqpLib\Message\AMQPMessage::$delivery_info has been deprecated. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

206
            /** @scrutinizer ignore-deprecated */ $msg->delivery_info['channel']->basic_reject($msg->delivery_info['delivery_tag'], false);
Loading history...
207
        } else if ($processFlag !== ConsumerInterface::MSG_ACK_SENT) {
208
            // Remove message from queue only if callback return not false
209
            $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
0 ignored issues
show
Deprecated Code introduced by
The property PhpAmqpLib\Message\AMQPMessage::$delivery_info has been deprecated. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

209
            $msg->delivery_info['channel']->basic_ack(/** @scrutinizer ignore-deprecated */ $msg->delivery_info['delivery_tag']);
Loading history...
210
        }
211
212
        $this->consumed++;
213
        $this->maybeStopConsumer();
214
215
        if (!is_null($this->getMemoryLimit()) && $this->isRamAlmostOverloaded()) {
216
            $this->stopConsuming();
217
        }
218
    }
219
220
    /**
221 4
     * Checks if memory in use is greater or equal than memory allowed for this process
222
     *
223 4
     * @return boolean
224 4
     */
225
    protected function isRamAlmostOverloaded()
226
    {
227
        $memoryManager = new MemoryConsumptionChecker(new NativeMemoryUsageProvider());
228
229 4
        return $memoryManager->isRamAlmostOverloaded($this->getMemoryLimit().'M', '5M');
230
    }
231 4
232 4
    /**
233
     * @param \DateTime|null $dateTime
234
     */
235
    public function setGracefulMaxExecutionDateTime(\DateTime $dateTime = null)
236
    {
237 2
        $this->gracefulMaxExecutionDateTime = $dateTime;
238
    }
239 2
240 2
    /**
241
     * @param int $secondsInTheFuture
242
     */
243
    public function setGracefulMaxExecutionDateTimeFromSecondsInTheFuture($secondsInTheFuture)
244
    {
245
        $this->setGracefulMaxExecutionDateTime(new \DateTime("+{$secondsInTheFuture} seconds"));
246
    }
247
248
    /**
249
     * @param int $exitCode
250
     */
251
    public function setGracefulMaxExecutionTimeoutExitCode($exitCode)
252
    {
253
        $this->gracefulMaxExecutionTimeoutExitCode = $exitCode;
254
    }
255
256
    /**
257
     * @param int $timeoutWait
258
     */
259
    public function setTimeoutWait(int $timeoutWait): void
260
    {
261
        $this->timeoutWait = $timeoutWait;
262
    }
263
264
    /**
265
     * @return \DateTime|null
266
     */
267 10
    public function getGracefulMaxExecutionDateTime()
268
    {
269 10
        return $this->gracefulMaxExecutionDateTime;
270 4
    }
271 4
272 4
    /**
273 4
     * @return int
274 4
     */
275
    public function getGracefulMaxExecutionTimeoutExitCode()
276 4
    {
277 2
        return $this->gracefulMaxExecutionTimeoutExitCode;
278
    }
279
280
    /**
281
     * @return int
282
     */
283
    public function getTimeoutWait(): int
284
    {
285 4
        return $this->timeoutWait;
0 ignored issues
show
Bug Best Practice introduced by
The expression return $this->timeoutWait could return the type null which is incompatible with the type-hinted return integer. Consider adding an additional type-check to rule them out.
Loading history...
286 4
    }
287
288
    /**
289
     * Choose the timeout wait (in seconds) to use for the $this->getChannel()->wait() method.
290
     *
291
     * @return int
292
     */
293
    private function chooseWaitTimeout(): int
294
    {
295 4
        if ($this->gracefulMaxExecutionDateTime) {
296 4
            $allowedExecutionDateInterval = $this->gracefulMaxExecutionDateTime->diff(new \DateTime());
297
            $allowedExecutionSeconds = $allowedExecutionDateInterval->days * 86400
298
                + $allowedExecutionDateInterval->h * 3600
299
                + $allowedExecutionDateInterval->i * 60
300
                + $allowedExecutionDateInterval->s;
301 6
302 6
            if (!$allowedExecutionDateInterval->invert) {
303
                $allowedExecutionSeconds *= -1;
304
            }
305
306
            /*
307
             * Respect the idle timeout if it's set and if it's less than
308
             * the remaining allowed execution.
309
             */
310
            if ($this->getIdleTimeout()
311
                && $this->getIdleTimeout() < $allowedExecutionSeconds
312
            ) {
313
                $waitTimeout = $this->getIdleTimeout();
314
            } else {
315
                $waitTimeout = $allowedExecutionSeconds;
316
            }
317
        } else {
318
            $waitTimeout = $this->getIdleTimeout();
319
        }
320
321
        if ($this->getTimeoutWait()) {
322
            $waitTimeout = min($waitTimeout, $this->getTimeoutWait());
323
        }
324
        return $waitTimeout;
325
    }
326
327
    /**
328
     * @param \DateTime $dateTime
329
     */
330
    public function setLastActivityDateTime(\DateTime $dateTime)
331
    {
332
        $this->lastActivityDateTime = $dateTime;
333
    }
334
335
    /**
336
     * @return \DateTime|null
337
     */
338
    protected function getLastActivityDateTime(): ?\DateTime
339
    {
340
        return $this->lastActivityDateTime;
341
    }
342
}
343