Passed
Pull Request — master (#36)
by
unknown
15:58 queued 07:42
created

Consumer::getLastActivityDateTime()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 1
nc 1
nop 0
dl 0
loc 3
ccs 2
cts 2
cp 1
crap 1
rs 10
c 0
b 0
f 0
1
<?php
2
3
namespace OldSound\RabbitMqBundle\RabbitMq;
4
5
use OldSound\RabbitMqBundle\Event\AfterProcessingMessageEvent;
6
use OldSound\RabbitMqBundle\Event\BeforeProcessingMessageEvent;
7
use OldSound\RabbitMqBundle\Event\OnConsumeEvent;
8
use OldSound\RabbitMqBundle\Event\OnIdleEvent;
9
use OldSound\RabbitMqBundle\MemoryChecker\MemoryConsumptionChecker;
10
use OldSound\RabbitMqBundle\MemoryChecker\NativeMemoryUsageProvider;
11
use PhpAmqpLib\Exception\AMQPTimeoutException;
12
use PhpAmqpLib\Message\AMQPMessage;
13
14
class Consumer extends BaseConsumer
15
{
16
    /**
17
     * @var int|null $memoryLimit
18
     */
19
    protected $memoryLimit = null;
20
21
    /**
22
     * @var \DateTime|null DateTime after which the consumer will gracefully exit. "Gracefully" means, that
23
     *      any currently running consumption will not be interrupted.
24
     */
25
    protected $gracefulMaxExecutionDateTime;
26
27
    /**
28
     * @var int Exit code used, when consumer is closed by the Graceful Max Execution Timeout feature.
29
     */
30
    protected $gracefulMaxExecutionTimeoutExitCode = 0;
31
32
    /**
33
     * @var int|null
34
     */
35
    protected $timeoutWait;
36
37
    /**
38
     * @var \DateTime|null
39
     */
40
    protected $lastActivityDateTime;
41
42
    /**
43
     * Set the memory limit
44
     *
45
     * @param int $memoryLimit
46
     */
47
    public function setMemoryLimit($memoryLimit)
48
    {
49
        $this->memoryLimit = $memoryLimit;
50
    }
51
52
    /**
53
     * Get the memory limit
54
     *
55
     * @return int|null
56
     */
57 32
    public function getMemoryLimit()
58
    {
59 32
        return $this->memoryLimit;
60
    }
61
62
    /**
63
     * Consume the message
64
     *
65
     * @param   int     $msgAmount
66
     *
67
     * @return  int
68
     *
69
     * @throws  AMQPTimeoutException
70
     */
71 14
    public function consume($msgAmount)
72
    {
73 14
        $this->target = $msgAmount;
74
75 14
        $this->setupConsumer();
76
77 14
        $this->setLastActivityDateTime(new \DateTime());
78 14
        while (count($this->getChannel()->callbacks)) {
79 12
            $this->dispatchEvent(OnConsumeEvent::NAME, new OnConsumeEvent($this));
80 12
            $this->maybeStopConsumer();
81
82
            /*
83
             * Be careful not to trigger ::wait() with 0 or less seconds, when
84
             * graceful max execution timeout is being used.
85
             */
86 12
            $waitTimeout = $this->chooseWaitTimeout();
87 12
            if ($this->gracefulMaxExecutionDateTime
88 12
                && $waitTimeout < 1
89
            ) {
90 2
                return $this->gracefulMaxExecutionTimeoutExitCode;
91
            }
92
93 10
            if (!$this->forceStop) {
94
                try {
95 10
                    $this->getChannel()->wait(null, false, $waitTimeout);
96 2
                    $this->setLastActivityDateTime(new \DateTime());
97 8
                } catch (AMQPTimeoutException $e) {
98 8
                    $now = time();
99
100 8
                    if ($this->gracefulMaxExecutionDateTime
101 8
                        && $this->gracefulMaxExecutionDateTime <= new \DateTime("@$now")
102
                    ) {
103 2
                        return $this->gracefulMaxExecutionTimeoutExitCode;
104 6
                    } elseif ($this->getIdleTimeout()
105 6
                        && ($this->getLastActivityDateTime()->getTimestamp() + $this->getIdleTimeout() <= $now)
106
                    ) {
107 6
                        $idleEvent = new OnIdleEvent($this);
108 6
                        $this->dispatchEvent(OnIdleEvent::NAME, $idleEvent);
109
110 6
                        if ($idleEvent->isForceStop()) {
111 6
                            if (null !== $this->getIdleTimeoutExitCode()) {
112 4
                                return $this->getIdleTimeoutExitCode();
113
                            } else {
114 2
                                throw $e;
115
                            }
116
                        }
117
                    }
118
                }
119
            }
120
        }
121
122 4
        return 0;
123
    }
124
125
    /**
126
     * Purge the queue
127
     */
128
    public function purge()
129
    {
130
        $this->getChannel()->queue_purge($this->queueOptions['name'], true);
131
    }
132
133
    /**
134
     * Delete the queue
135
     */
136
    public function delete()
137
    {
138
        $this->getChannel()->queue_delete($this->queueOptions['name'], true);
139
    }
140
141 32
    protected function processMessageQueueCallback(AMQPMessage $msg, $queueName, $callback)
142
    {
143 32
        $this->dispatchEvent(BeforeProcessingMessageEvent::NAME,
144 32
            new BeforeProcessingMessageEvent($this, $msg)
145
        );
146
        try {
147 32
            $processFlag = call_user_func($callback, $msg);
148 32
            $this->handleProcessMessage($msg, $processFlag);
149 32
            $this->dispatchEvent(
150 32
                AfterProcessingMessageEvent::NAME,
151 32
                new AfterProcessingMessageEvent($this, $msg)
152
            );
153 32
            $this->logger->debug('Queue message processed', array(
154
                'amqp' => array(
155 32
                    'queue' => $queueName,
156 32
                    'message' => $msg,
157 32
                    'return_code' => $processFlag
158
                )
159
            ));
160
        } catch (Exception\StopConsumerException $e) {
161
            $this->logger->info('Consumer requested restart', array(
162
                'amqp' => array(
163
                    'queue' => $queueName,
164
                    'message' => $msg,
165
                    'stacktrace' => $e->getTraceAsString()
166
                )
167
            ));
168
            $this->handleProcessMessage($msg, $e->getHandleCode());
169
            $this->stopConsuming();
170
        } catch (\Exception $e) {
171
            $this->logger->error($e->getMessage(), array(
172
                'amqp' => array(
173
                    'queue' => $queueName,
174
                    'message' => $msg,
175
                    'stacktrace' => $e->getTraceAsString()
176
                )
177
            ));
178
            throw $e;
179
        } catch (\Error $e) {
180
            $this->logger->error($e->getMessage(), array(
181
                'amqp' => array(
182
                    'queue' => $queueName,
183
                    'message' => $msg,
184
                    'stacktrace' => $e->getTraceAsString()
185
                )
186
            ));
187
            throw $e;
188
        }
189 32
    }
190
191 14
    public function processMessage(AMQPMessage $msg)
192
    {
193 14
        $this->processMessageQueueCallback($msg, $this->queueOptions['name'], $this->callback);
194 14
    }
195
196 32
    protected function handleProcessMessage(AMQPMessage $msg, $processFlag)
197
    {
198 32
        if ($processFlag === ConsumerInterface::MSG_REJECT_REQUEUE || false === $processFlag) {
199
            // Reject and requeue message to RabbitMQ
200 10
            $msg->delivery_info['channel']->basic_reject($msg->delivery_info['delivery_tag'], true);
0 ignored issues
show
Deprecated Code introduced by
The property PhpAmqpLib\Message\AMQPMessage::$delivery_info has been deprecated. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

200
            /** @scrutinizer ignore-deprecated */ $msg->delivery_info['channel']->basic_reject($msg->delivery_info['delivery_tag'], true);
Loading history...
201 22
        } else if ($processFlag === ConsumerInterface::MSG_SINGLE_NACK_REQUEUE) {
202
            // NACK and requeue message to RabbitMQ
203
            $msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag'], false, true);
0 ignored issues
show
Deprecated Code introduced by
The property PhpAmqpLib\Message\AMQPMessage::$delivery_info has been deprecated. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

203
            /** @scrutinizer ignore-deprecated */ $msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag'], false, true);
Loading history...
204 22
        } else if ($processFlag === ConsumerInterface::MSG_REJECT) {
205
            // Reject and drop
206 5
            $msg->delivery_info['channel']->basic_reject($msg->delivery_info['delivery_tag'], false);
0 ignored issues
show
Deprecated Code introduced by
The property PhpAmqpLib\Message\AMQPMessage::$delivery_info has been deprecated. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

206
            /** @scrutinizer ignore-deprecated */ $msg->delivery_info['channel']->basic_reject($msg->delivery_info['delivery_tag'], false);
Loading history...
207 17
        } else if ($processFlag !== ConsumerInterface::MSG_ACK_SENT) {
208
            // Remove message from queue only if callback return not false
209 15
            $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
0 ignored issues
show
Deprecated Code introduced by
The property PhpAmqpLib\Message\AMQPMessage::$delivery_info has been deprecated. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

209
            $msg->delivery_info['channel']->basic_ack(/** @scrutinizer ignore-deprecated */ $msg->delivery_info['delivery_tag']);
Loading history...
210
        }
211
212 32
        $this->consumed++;
213 32
        $this->maybeStopConsumer();
214
215 32
        if (!is_null($this->getMemoryLimit()) && $this->isRamAlmostOverloaded()) {
216
            $this->stopConsuming();
217
        }
218 32
    }
219
220
    /**
221
     * Checks if memory in use is greater or equal than memory allowed for this process
222
     *
223
     * @return boolean
224
     */
225
    protected function isRamAlmostOverloaded()
226
    {
227
        $memoryManager = new MemoryConsumptionChecker(new NativeMemoryUsageProvider());
228
229
        return $memoryManager->isRamAlmostOverloaded($this->getMemoryLimit().'M', '5M');
230
    }
231
232
    /**
233
     * @param \DateTime|null $dateTime
234
     */
235 4
    public function setGracefulMaxExecutionDateTime(\DateTime $dateTime = null)
236
    {
237 4
        $this->gracefulMaxExecutionDateTime = $dateTime;
238 4
    }
239
240
    /**
241
     * @param int $secondsInTheFuture
242
     */
243 4
    public function setGracefulMaxExecutionDateTimeFromSecondsInTheFuture($secondsInTheFuture)
244
    {
245 4
        $this->setGracefulMaxExecutionDateTime(new \DateTime("+{$secondsInTheFuture} seconds"));
246 4
    }
247
248
    /**
249
     * @param int $exitCode
250
     */
251 2
    public function setGracefulMaxExecutionTimeoutExitCode($exitCode)
252
    {
253 2
        $this->gracefulMaxExecutionTimeoutExitCode = $exitCode;
254 2
    }
255
256
    /**
257
     * @param int $timeoutWait
258
     */
259 2
    public function setTimeoutWait($timeoutWait)
260
    {
261 2
        $this->timeoutWait = $timeoutWait;
262 2
    }
263
264
    /**
265
     * @return \DateTime|null
266
     */
267
    public function getGracefulMaxExecutionDateTime()
268
    {
269
        return $this->gracefulMaxExecutionDateTime;
270
    }
271
272
    /**
273
     * @return int
274
     */
275
    public function getGracefulMaxExecutionTimeoutExitCode()
276
    {
277
        return $this->gracefulMaxExecutionTimeoutExitCode;
278
    }
279
280
    /**
281
     * @return int
282
     */
283 12
    public function getTimeoutWait()
284
    {
285 12
        return $this->timeoutWait;
286
    }
287
288
    /**
289
     * Choose the timeout wait (in seconds) to use for the $this->getChannel()->wait() method.
290
     *
291
     * @return int
292
     */
293 12
    private function chooseWaitTimeout()
294
    {
295 12
        if ($this->gracefulMaxExecutionDateTime) {
296 4
            $allowedExecutionDateInterval = $this->gracefulMaxExecutionDateTime->diff(new \DateTime());
297 4
            $allowedExecutionSeconds =  $allowedExecutionDateInterval->days * 86400
298 4
                + $allowedExecutionDateInterval->h * 3600
299 4
                + $allowedExecutionDateInterval->i * 60
300 4
                + $allowedExecutionDateInterval->s;
301
302 4
            if (!$allowedExecutionDateInterval->invert) {
303 2
                $allowedExecutionSeconds *= -1;
304
            }
305
306
            /*
307
             * Respect the idle timeout if it's set and if it's less than
308
             * the remaining allowed execution.
309
             */
310 4
            if ($this->getIdleTimeout()
311 4
                && $this->getIdleTimeout() < $allowedExecutionSeconds
312
            ) {
313
                $waitTimeout = $this->getIdleTimeout();
314
            } else {
315 4
                $waitTimeout = $allowedExecutionSeconds;
316
            }
317
        } else {
318 8
            $waitTimeout = $this->getIdleTimeout();
319
        }
320
321 12
        if ($this->getTimeoutWait()) {
322 2
            $waitTimeout = min($waitTimeout, $this->getTimeoutWait());
323
        }
324 12
        return $waitTimeout;
325
    }
326
327
    /**
328
     * @param \DateTime $dateTime
329
     */
330 14
    public function setLastActivityDateTime(\DateTime $dateTime)
331
    {
332 14
        $this->lastActivityDateTime = $dateTime;
333 14
    }
334
335
    /**
336
     * @return \DateTime|null
337
     */
338 6
    protected function getLastActivityDateTime()
339
    {
340 6
        return $this->lastActivityDateTime;
341
    }
342
}
343