Completed
Pull Request — master (#27)
by
unknown
03:15
created

RabbitMq/BatchConsumer.php (1 issue)

Labels
Severity
1
<?php
2
3
namespace OldSound\RabbitMqBundle\RabbitMq;
4
5
use PhpAmqpLib\Channel\AMQPChannel;
6
use PhpAmqpLib\Exception\AMQPRuntimeException;
7
use PhpAmqpLib\Exception\AMQPTimeoutException;
8
use PhpAmqpLib\Message\AMQPMessage;
9
10
class BatchConsumer extends BaseAmqp implements DequeuerInterface
11
{
12
    /**
13
     * @var \Closure|callable
14
     */
15
    protected $callback;
16
17
    /**
18
     * @var bool
19
     */
20
    protected $forceStop = false;
21
22
    /**
23
     * @var int
24
     */
25
    protected $idleTimeout = 0;
26
27
    /**
28
     * @var bool
29
     */
30
    private $keepAlive = false;
31
32
    /**
33
     * @var int
34
     */
35
    protected $idleTimeoutExitCode;
36
37
    /**
38
     * @var int
39
     */
40
    protected $memoryLimit = null;
41
42
    /**
43
     * @var int
44
     */
45
    protected $prefetchCount;
46
47
    /**
48
     * @var int
49
     */
50
    protected $timeoutWait = 3;
51
52
    /**
53
     * @var array
54
     */
55
    protected $messages = array();
56
57
    /**
58
     * @var int
59
     */
60
    protected $batchCounter = 0;
61
62
    /**
63
     * @var int
64
     */
65
    protected $consumed = 0;
66
67
    /**
68
     * @var \DateTime|null DateTime after which the consumer will gracefully exit. "Gracefully" means, that
69
     *      any currently running consumption will not be interrupted.
70
     */
71
    protected $gracefulMaxExecutionDateTime;
72
73
    /**
74
     * @param \DateTime|null $dateTime
75
     */
76
    public function setGracefulMaxExecutionDateTime(\DateTime $dateTime = null)
77
    {
78
        $this->gracefulMaxExecutionDateTime = $dateTime;
79
    }
80
81
    /**
82
     * @param int $secondsInTheFuture
83
     */
84
    public function setGracefulMaxExecutionDateTimeFromSecondsInTheFuture($secondsInTheFuture)
85
    {
86
        $this->setGracefulMaxExecutionDateTime(new \DateTime("+{$secondsInTheFuture} seconds"));
87
    }
88
89
    /**
90
     * @param   \Closure|callable    $callback
91
     *
92
     * @return  $this
93
     */
94
    public function setCallback($callback)
95
    {
96
        $this->callback = $callback;
97
98
        return $this;
99
    }
100
101
    public function consume()
102
    {
103
        $this->setupConsumer();
104
105
        while (count($this->getChannel()->callbacks)) {
106
            if ($this->isCompleteBatch()) {
107
                $this->batchConsume();
108
            }
109
110
            $this->checkGracefulMaxExecutionDateTime();
111
            $this->maybeStopConsumer();
112
113
            $timeout = $this->isEmptyBatch() ? $this->getIdleTimeout() : $this->getTimeoutWait();
114
115
            try {
116
                $this->getChannel()->wait(null, false, $timeout);
117
            } catch (AMQPTimeoutException $e) {
118
                if (!$this->isEmptyBatch()) {
119
                    $this->batchConsume();
120
                } elseif ($this->keepAlive === true) {
121
                    continue;
122
                } elseif (null !== $this->getIdleTimeoutExitCode()) {
123
                    return $this->getIdleTimeoutExitCode();
124
                } else {
125
                    throw $e;
126
                }
127
            }
128
        }
129
130
        return 0;
131
    }
132
133
    private function batchConsume()
134
    {
135
        try {
136
            $processFlags = call_user_func($this->callback, $this->messages);
137
            $this->handleProcessMessages($processFlags);
138
            $this->logger->debug('Queue message processed', array(
139
                'amqp' => array(
140
                    'queue' => $this->queueOptions['name'],
141
                    'messages' => $this->messages,
142
                    'return_codes' => $processFlags
143
                )
144
            ));
145
        } catch (Exception\StopConsumerException $e) {
146
            $this->logger->info('Consumer requested restart', array(
147
                'amqp' => array(
148
                    'queue' => $this->queueOptions['name'],
149
                    'message' => $this->messages,
150
                    'stacktrace' => $e->getTraceAsString()
151
                )
152
            ));
153
            $this->resetBatch();
154
            $this->stopConsuming();
155
        } catch (\Exception $e) {
156
            $this->logger->error($e->getMessage(), array(
157
                'amqp' => array(
158
                    'queue' => $this->queueOptions['name'],
159
                    'message' => $this->messages,
160
                    'stacktrace' => $e->getTraceAsString()
161
                )
162
            ));
163
            $this->resetBatch();
164
            throw $e;
165
        } catch (\Error $e) {
166
            $this->logger->error($e->getMessage(), array(
167
                'amqp' => array(
168
                    'queue' => $this->queueOptions['name'],
169
                    'message' => $this->messages,
170
                    'stacktrace' => $e->getTraceAsString()
171
                )
172
            ));
173
            $this->resetBatch();
174
            throw $e;
175
        }
176
177
        $this->resetBatch();
178
    }
179
180
    /**
181
     * @param   mixed   $processFlags
182
     *
183
     * @return  void
184
     */
185
    protected function handleProcessMessages($processFlags = null)
186
    {
187
        $processFlags = $this->analyzeProcessFlags($processFlags);
188
        foreach ($processFlags as $deliveryTag => $processFlag) {
189
            $this->handleProcessFlag($deliveryTag, $processFlag);
190
        }
191
    }
192
193
    /**
194
     * @param   string|int     $deliveryTag
195
     * @param   mixed          $processFlag
196
     *
197
     * @return  void
198
     */
199
    private function handleProcessFlag($deliveryTag, $processFlag)
200
    {
201
        if ($processFlag === ConsumerInterface::MSG_REJECT_REQUEUE || false === $processFlag) {
202
            // Reject and requeue message to RabbitMQ
203
            $this->getMessageChannel($deliveryTag)->basic_reject($deliveryTag, true);
0 ignored issues
show
It seems like $deliveryTag can also be of type string; however, parameter $deliveryTag of OldSound\RabbitMqBundle\...er::getMessageChannel() does only seem to accept integer, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

203
            $this->getMessageChannel(/** @scrutinizer ignore-type */ $deliveryTag)->basic_reject($deliveryTag, true);
Loading history...
204
        } else if ($processFlag === ConsumerInterface::MSG_SINGLE_NACK_REQUEUE) {
205
            // NACK and requeue message to RabbitMQ
206
            $this->getMessageChannel($deliveryTag)->basic_nack($deliveryTag, false, true);
207
        } else if ($processFlag === ConsumerInterface::MSG_REJECT) {
208
            // Reject and drop
209
            $this->getMessageChannel($deliveryTag)->basic_reject($deliveryTag, false);
210
        } else {
211
            // Remove message from queue only if callback return not false
212
            $this->getMessageChannel($deliveryTag)->basic_ack($deliveryTag);
213
        }
214
    }
215
216
    /**
217
     * @return  bool
218
     */
219
    protected function isCompleteBatch()
220
    {
221
        return $this->batchCounter === $this->prefetchCount;
222
    }
223
224
    /**
225
     * @return  bool
226
     */
227
    protected function isEmptyBatch()
228
    {
229
        return $this->batchCounter === 0;
230
    }
231
232
    /**
233
     * @param   AMQPMessage     $msg
234
     *
235
     * @return  void
236
     *
237
     * @throws  \Error
238
     * @throws  \Exception
239
     */
240
    public function processMessage(AMQPMessage $msg)
241
    {
242
        $this->addMessage($msg);
243
244
        $this->maybeStopConsumer();
245
    }
246
247
    /**
248
     * @param   mixed   $processFlags
249
     *
250
     * @return  array
251
     */
252
    private function analyzeProcessFlags($processFlags = null)
253
    {
254
        if (is_array($processFlags)) {
255
            if (count($processFlags) !== $this->batchCounter) {
256
                throw new AMQPRuntimeException(
257
                    'Method batchExecute() should return an array with elements equal with the number of messages processed'
258
                );
259
            }
260
261
            return $processFlags;
262
        }
263
264
        $response = array();
265
        foreach ($this->messages as $deliveryTag => $message) {
266
            $response[$deliveryTag] = $processFlags;
267
        }
268
269
        return $response;
270
    }
271
272
273
    /**
274
     * @return  void
275
     */
276
    private function resetBatch()
277
    {
278
        $this->messages = array();
279
        $this->batchCounter = 0;
280
    }
281
282
    /**
283
     * @param   AMQPMessage $message
284
     *
285
     * @return  void
286
     */
287
    private function addMessage(AMQPMessage $message)
288
    {
289
        $this->batchCounter++;
290
        $this->messages[(int)$message->delivery_info['delivery_tag']] = $message;
291
    }
292
293
    /**
294
     * @param   int     $deliveryTag
295
     *
296
     * @return  AMQPMessage|null
297
     */
298
    private function getMessage($deliveryTag)
299
    {
300
        return isset($this->messages[$deliveryTag])
301
            ? $this->messages[$deliveryTag]
302
            : null
303
        ;
304
    }
305
306
    /**
307
     * @param   int     $deliveryTag
308
     *
309
     * @return  AMQPChannel
310
     *
311
     * @throws  AMQPRuntimeException
312
     */
313
    private function getMessageChannel($deliveryTag)
314
    {
315
        $message = $this->getMessage($deliveryTag);
316
        if (empty($message)) {
317
            throw new AMQPRuntimeException(sprintf('Unknown delivery_tag %d!', $deliveryTag));
318
        }
319
320
        return $message->delivery_info['channel'];
321
    }
322
323
    /**
324
     * @return  void
325
     */
326
    public function stopConsuming()
327
    {
328
        if (!$this->isEmptyBatch()) {
329
            $this->batchConsume();
330
        }
331
332
        $this->getChannel()->basic_cancel($this->getConsumerTag(), false, true);
333
    }
334
335
    /**
336
     * @return  void
337
     */
338
    protected function setupConsumer()
339
    {
340
        if ($this->autoSetupFabric) {
341
            $this->setupFabric();
342
        }
343
344
        $this->getChannel()->basic_consume($this->queueOptions['name'], $this->getConsumerTag(), false, false, false, false, array($this, 'processMessage'));
345
    }
346
347
    /**
348
     * @return  void
349
     *
350
     * @throws \BadFunctionCallException
351
     */
352
    protected function maybeStopConsumer()
353
    {
354
        if (extension_loaded('pcntl') && (defined('AMQP_WITHOUT_SIGNALS') ? !AMQP_WITHOUT_SIGNALS : true)) {
355
            if (!function_exists('pcntl_signal_dispatch')) {
356
                throw new \BadFunctionCallException("Function 'pcntl_signal_dispatch' is referenced in the php.ini 'disable_functions' and can't be called.");
357
            }
358
359
            pcntl_signal_dispatch();
360
        }
361
362
        if ($this->forceStop) {
363
            $this->stopConsuming();
364
        }
365
366
        if (null !== $this->getMemoryLimit() && $this->isRamAlmostOverloaded()) {
367
            $this->stopConsuming();
368
        }
369
    }
370
371
    /**
372
     * @param   string  $tag
373
     *
374
     * @return  $this
375
     */
376
    public function setConsumerTag($tag)
377
    {
378
        $this->consumerTag = $tag;
379
380
        return $this;
381
    }
382
383
    /**
384
     * @return  string
385
     */
386
    public function getConsumerTag()
387
    {
388
        return $this->consumerTag;
389
    }
390
391
    /**
392
     * @return  void
393
     */
394
    public function forceStopConsumer()
395
    {
396
        $this->forceStop = true;
397
    }
398
399
    /**
400
     * Sets the qos settings for the current channel
401
     * Consider that prefetchSize and global do not work with rabbitMQ version <= 8.0
402
     *
403
     * @param int $prefetchSize
404
     * @param int $prefetchCount
405
     * @param bool $global
406
     */
407
    public function setQosOptions($prefetchSize = 0, $prefetchCount = 0, $global = false)
408
    {
409
        $this->prefetchCount = $prefetchCount;
410
        $this->getChannel()->basic_qos($prefetchSize, $prefetchCount, $global);
411
    }
412
413
    /**
414
     * @param   int     $idleTimeout
415
     *
416
     * @return  $this
417
     */
418
    public function setIdleTimeout($idleTimeout)
419
    {
420
        $this->idleTimeout = $idleTimeout;
421
422
        return $this;
423
    }
424
425
    /**
426
     * Set exit code to be returned when there is a timeout exception
427
     *
428
     * @param   int     $idleTimeoutExitCode
429
     *
430
     * @return  $this
431
     */
432
    public function setIdleTimeoutExitCode($idleTimeoutExitCode)
433
    {
434
        $this->idleTimeoutExitCode = $idleTimeoutExitCode;
435
436
        return $this;
437
    }
438
439
    /**
440
     * keepAlive
441
     *
442
     * @return $this
443
     */
444
    public function keepAlive()
445
    {
446
        $this->keepAlive = true;
447
448
        return $this;
449
    }
450
451
    /**
452
     * Purge the queue
453
     */
454
    public function purge()
455
    {
456
        $this->getChannel()->queue_purge($this->queueOptions['name'], true);
457
    }
458
459
    /**
460
     * Delete the queue
461
     */
462
    public function delete()
463
    {
464
        $this->getChannel()->queue_delete($this->queueOptions['name'], true);
465
    }
466
467
    /**
468
     * Checks if memory in use is greater or equal than memory allowed for this process
469
     *
470
     * @return boolean
471
     */
472
    protected function isRamAlmostOverloaded()
473
    {
474
        return (memory_get_usage(true) >= ($this->getMemoryLimit() * 1048576));
475
    }
476
477
    /**
478
     * @return  int
479
     */
480
    public function getIdleTimeout()
481
    {
482
        return $this->idleTimeout;
483
    }
484
485
    /**
486
     * Get exit code to be returned when there is a timeout exception
487
     *
488
     * @return  int|null
489
     */
490
    public function getIdleTimeoutExitCode()
491
    {
492
        return $this->idleTimeoutExitCode;
493
    }
494
495
    /**
496
     * Resets the consumed property.
497
     * Use when you want to call start() or consume() multiple times.
498
     */
499
    public function resetConsumed()
500
    {
501
        $this->consumed = 0;
502
    }
503
504
    /**
505
     * @param   int     $timeout
506
     *
507
     * @return  $this
508
     */
509
    public function setTimeoutWait($timeout)
510
    {
511
        $this->timeoutWait = $timeout;
512
513
        return $this;
514
    }
515
516
    /**
517
     * @param   int $amount
518
     *
519
     * @return  $this
520
     */
521
    public function setPrefetchCount($amount)
522
    {
523
        $this->prefetchCount = $amount;
524
525
        return $this;
526
    }
527
528
    /**
529
     * @return int
530
     */
531
    public function getTimeoutWait()
532
    {
533
        return $this->timeoutWait;
534
    }
535
536
    /**
537
     * @return int
538
     */
539
    public function getPrefetchCount()
540
    {
541
        return $this->prefetchCount;
542
    }
543
544
    /**
545
     * Set the memory limit
546
     *
547
     * @param int $memoryLimit
548
     */
549
    public function setMemoryLimit($memoryLimit)
550
    {
551
        $this->memoryLimit = $memoryLimit;
552
    }
553
554
    /**
555
     * Get the memory limit
556
     *
557
     * @return int
558
     */
559
    public function getMemoryLimit()
560
    {
561
        return $this->memoryLimit;
562
    }
563
564
    /**
565
     * Check graceful max execution date time and stop if limit is reached
566
     *
567
     * @return void
568
     */
569
    private function checkGracefulMaxExecutionDateTime()
570
    {
571
        if (!$this->gracefulMaxExecutionDateTime) {
572
            return;
573
        }
574
575
        $now = new \DateTime();
576
577
        if ($this->gracefulMaxExecutionDateTime > $now) {
578
            return;
579
        }
580
581
        $this->forceStopConsumer();
582
    }
583
}
584