BatchConsumer::processMessage()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 5
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
eloc 2
c 1
b 0
f 0
nc 1
nop 1
dl 0
loc 5
ccs 0
cts 4
cp 0
crap 2
rs 10
1
<?php
2
3
namespace OldSound\RabbitMqBundle\RabbitMq;
4
5
use PhpAmqpLib\Channel\AMQPChannel;
6
use PhpAmqpLib\Exception\AMQPRuntimeException;
7
use PhpAmqpLib\Exception\AMQPTimeoutException;
8
use PhpAmqpLib\Message\AMQPMessage;
9
10
class BatchConsumer extends BaseAmqp implements DequeuerInterface
11
{
12
    /**
13
     * @var \Closure|callable
14
     */
15
    protected $callback;
16
17
    /**
18
     * @var bool
19
     */
20
    protected $forceStop = false;
21
22
    /**
23
     * @var int
24
     */
25
    protected $idleTimeout = 0;
26
27
    /**
28
     * @var bool
29
     */
30
    private $keepAlive = false;
31
32
    /**
33
     * @var int
34
     */
35
    protected $idleTimeoutExitCode;
36
37
    /**
38
     * @var int
39
     */
40
    protected $memoryLimit = null;
41
42
    /**
43
     * @var int
44
     */
45
    protected $prefetchCount;
46
47
    /**
48
     * @var int
49
     */
50
    protected $timeoutWait = 3;
51
52
    /**
53
     * @var array
54
     */
55
    protected $messages = array();
56
57
    /**
58
     * @var int
59
     */
60
    protected $batchCounter = 0;
61
62
    /**
63
     * @var int
64
     */
65
    protected $batchAmount = 0;
66
67
    /**
68
     * @var int
69
     */
70
    protected $consumed = 0;
71
72
    /**
73
     * @var \DateTime|null DateTime after which the consumer will gracefully exit. "Gracefully" means, that
74
     *      any currently running consumption will not be interrupted.
75
     */
76
    protected $gracefulMaxExecutionDateTime;
77
78
    /** @var int */
79
    private $batchAmountTarget;
80
81
    /**
82
     * @param \DateTime|null $dateTime
83
     */
84
    public function setGracefulMaxExecutionDateTime(\DateTime $dateTime = null)
85
    {
86
        $this->gracefulMaxExecutionDateTime = $dateTime;
87
    }
88
89
    /**
90
     * @param int $secondsInTheFuture
91
     */
92
    public function setGracefulMaxExecutionDateTimeFromSecondsInTheFuture($secondsInTheFuture)
93
    {
94
        $this->setGracefulMaxExecutionDateTime(new \DateTime("+{$secondsInTheFuture} seconds"));
95
    }
96
97
    /**
98
     * @param   \Closure|callable    $callback
99
     *
100
     * @return  $this
101
     */
102
    public function setCallback($callback)
103
    {
104
        $this->callback = $callback;
105
106
        return $this;
107
    }
108
109
    public function consume(int $batchAmountTarget = 0)
110
    {
111
        $this->batchAmountTarget = $batchAmountTarget;
112
113
        $this->setupConsumer();
114
115
        while (count($this->getChannel()->callbacks)) {
116
            if ($this->isCompleteBatch()) {
117
                $this->batchConsume();
118
            }
119
120
            $this->checkGracefulMaxExecutionDateTime();
121
            $this->maybeStopConsumer();
122
123
            $timeout = $this->isEmptyBatch() ? $this->getIdleTimeout() : $this->getTimeoutWait();
124
125
            try {
126
                $this->getChannel()->wait(null, false, $timeout);
127
            } catch (AMQPTimeoutException $e) {
128
                if (!$this->isEmptyBatch()) {
129
                    $this->batchConsume();
130
                    $this->maybeStopConsumer();
131
                } elseif ($this->keepAlive === true) {
132
                    continue;
133
                } elseif (null !== $this->getIdleTimeoutExitCode()) {
134
                    return $this->getIdleTimeoutExitCode();
135
                } else {
136
                    throw $e;
137
                }
138
            }
139
        }
140
141
        return 0;
142
    }
143
144
    private function batchConsume()
145
    {
146
        try {
147
            $processFlags = call_user_func($this->callback, $this->messages);
148
            $this->handleProcessMessages($processFlags);
149
            $this->logger->debug('Queue message processed', array(
150
                'amqp' => array(
151
                    'queue' => $this->queueOptions['name'],
152
                    'messages' => $this->messages,
153
                    'return_codes' => $processFlags
154
                )
155
            ));
156
        } catch (Exception\StopConsumerException $e) {
157
            $this->logger->info('Consumer requested restart', array(
158
                'amqp' => array(
159
                    'queue' => $this->queueOptions['name'],
160
                    'message' => $this->messages,
161
                    'stacktrace' => $e->getTraceAsString()
162
                )
163
            ));
164
            $this->resetBatch();
165
            $this->stopConsuming();
166
        } catch (\Exception $e) {
167
            $this->logger->error($e->getMessage(), array(
168
                'amqp' => array(
169
                    'queue' => $this->queueOptions['name'],
170
                    'message' => $this->messages,
171
                    'stacktrace' => $e->getTraceAsString()
172
                )
173
            ));
174
            $this->resetBatch();
175
            throw $e;
176
        } catch (\Error $e) {
177
            $this->logger->error($e->getMessage(), array(
178
                'amqp' => array(
179
                    'queue' => $this->queueOptions['name'],
180
                    'message' => $this->messages,
181
                    'stacktrace' => $e->getTraceAsString()
182
                )
183
            ));
184
            $this->resetBatch();
185
            throw $e;
186
        }
187
188
        $this->batchAmount++;
189
        $this->resetBatch();
190
    }
191
192
    /**
193
     * @param   mixed   $processFlags
194
     *
195
     * @return  void
196
     */
197
    protected function handleProcessMessages($processFlags = null)
198
    {
199
        $processFlags = $this->analyzeProcessFlags($processFlags);
200
        foreach ($processFlags as $deliveryTag => $processFlag) {
201
            $this->handleProcessFlag($deliveryTag, $processFlag);
202
        }
203
    }
204
205
    /**
206
     * @param   string|int     $deliveryTag
207
     * @param   mixed          $processFlag
208
     *
209
     * @return  void
210
     */
211
    private function handleProcessFlag($deliveryTag, $processFlag)
212
    {
213
        if ($processFlag === ConsumerInterface::MSG_REJECT_REQUEUE || false === $processFlag) {
214
            // Reject and requeue message to RabbitMQ
215
            $this->getMessageChannel($deliveryTag)->basic_reject($deliveryTag, true);
0 ignored issues
show
Bug introduced by
It seems like $deliveryTag can also be of type string; however, parameter $deliveryTag of OldSound\RabbitMqBundle\...er::getMessageChannel() does only seem to accept integer, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

215
            $this->getMessageChannel(/** @scrutinizer ignore-type */ $deliveryTag)->basic_reject($deliveryTag, true);
Loading history...
Bug introduced by
It seems like $deliveryTag can also be of type string; however, parameter $delivery_tag of PhpAmqpLib\Channel\AMQPChannel::basic_reject() does only seem to accept integer, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

215
            $this->getMessageChannel($deliveryTag)->basic_reject(/** @scrutinizer ignore-type */ $deliveryTag, true);
Loading history...
216
        } else if ($processFlag === ConsumerInterface::MSG_SINGLE_NACK_REQUEUE) {
217
            // NACK and requeue message to RabbitMQ
218
            $this->getMessageChannel($deliveryTag)->basic_nack($deliveryTag, false, true);
0 ignored issues
show
Bug introduced by
It seems like $deliveryTag can also be of type string; however, parameter $delivery_tag of PhpAmqpLib\Channel\AMQPChannel::basic_nack() does only seem to accept integer, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

218
            $this->getMessageChannel($deliveryTag)->basic_nack(/** @scrutinizer ignore-type */ $deliveryTag, false, true);
Loading history...
219
        } else if ($processFlag === ConsumerInterface::MSG_REJECT) {
220
            // Reject and drop
221
            $this->getMessageChannel($deliveryTag)->basic_reject($deliveryTag, false);
222
        } else {
223
            // Remove message from queue only if callback return not false
224
            $this->getMessageChannel($deliveryTag)->basic_ack($deliveryTag);
0 ignored issues
show
Bug introduced by
It seems like $deliveryTag can also be of type string; however, parameter $delivery_tag of PhpAmqpLib\Channel\AMQPChannel::basic_ack() does only seem to accept integer, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

224
            $this->getMessageChannel($deliveryTag)->basic_ack(/** @scrutinizer ignore-type */ $deliveryTag);
Loading history...
225
        }
226
227
    }
228
229
    /**
230
     * @return  bool
231
     */
232
    protected function isCompleteBatch()
233
    {
234
        return $this->batchCounter === $this->prefetchCount;
235
    }
236
237
    /**
238
     * @return  bool
239
     */
240
    protected function isEmptyBatch()
241
    {
242
        return $this->batchCounter === 0;
243
    }
244
245
    /**
246
     * @param   AMQPMessage     $msg
247
     *
248
     * @return  void
249
     *
250
     * @throws  \Error
251
     * @throws  \Exception
252
     */
253
    public function processMessage(AMQPMessage $msg)
254
    {
255
        $this->addMessage($msg);
256
257
        $this->maybeStopConsumer();
258
    }
259
260
    /**
261
     * @param   mixed   $processFlags
262
     *
263
     * @return  array
264
     */
265
    private function analyzeProcessFlags($processFlags = null)
266
    {
267
        if (is_array($processFlags)) {
268
            if (count($processFlags) !== $this->batchCounter) {
269
                throw new AMQPRuntimeException(
270
                    'Method batchExecute() should return an array with elements equal with the number of messages processed'
271
                );
272
            }
273
274
            return $processFlags;
275
        }
276
277
        $response = array();
278
        foreach ($this->messages as $deliveryTag => $message) {
279
            $response[$deliveryTag] = $processFlags;
280
        }
281
282
        return $response;
283
    }
284
285
286
    /**
287
     * @return  void
288
     */
289
    private function resetBatch()
290
    {
291
        $this->messages = array();
292
        $this->batchCounter = 0;
293
    }
294
295
    /**
296
     * @param   AMQPMessage $message
297
     *
298
     * @return  void
299
     */
300
    private function addMessage(AMQPMessage $message)
301
    {
302
        $this->batchCounter++;
303
        $this->messages[(int)$message->delivery_info['delivery_tag']] = $message;
0 ignored issues
show
Deprecated Code introduced by
The property PhpAmqpLib\Message\AMQPMessage::$delivery_info has been deprecated. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

303
        $this->messages[(int)/** @scrutinizer ignore-deprecated */ $message->delivery_info['delivery_tag']] = $message;
Loading history...
304
    }
305
306
    /**
307
     * @param   int     $deliveryTag
308
     *
309
     * @return  AMQPMessage|null
310
     */
311
    private function getMessage($deliveryTag)
312
    {
313
        return isset($this->messages[$deliveryTag])
314
            ? $this->messages[$deliveryTag]
315
            : null
316
        ;
317
    }
318
319
    /**
320
     * @param   int     $deliveryTag
321
     *
322
     * @return  AMQPChannel
323
     *
324
     * @throws  AMQPRuntimeException
325
     */
326
    private function getMessageChannel($deliveryTag)
327
    {
328
        $message = $this->getMessage($deliveryTag);
329
        if ($message === null) {
330
            throw new AMQPRuntimeException(sprintf('Unknown delivery_tag %d!', $deliveryTag));
331
        }
332
333
        return $message->delivery_info['channel'];
0 ignored issues
show
Deprecated Code introduced by
The property PhpAmqpLib\Message\AMQPMessage::$delivery_info has been deprecated. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-deprecated  annotation

333
        return /** @scrutinizer ignore-deprecated */ $message->delivery_info['channel'];
Loading history...
334
    }
335
336
    /**
337
     * @return  void
338
     */
339
    public function stopConsuming()
340
    {
341
        if (!$this->isEmptyBatch()) {
342
            $this->batchConsume();
343
        }
344
345
        $this->getChannel()->basic_cancel($this->getConsumerTag(), false, true);
346
    }
347
348
    /**
349
     * @return  void
350
     */
351
    protected function setupConsumer()
352
    {
353
        if ($this->autoSetupFabric) {
354
            $this->setupFabric();
355
        }
356
357
        $this->getChannel()->basic_consume($this->queueOptions['name'], $this->getConsumerTag(), false, false, false, false, array($this, 'processMessage'));
358
    }
359
360
    /**
361
     * @return  void
362
     *
363
     * @throws \BadFunctionCallException
364
     */
365
    protected function maybeStopConsumer()
366
    {
367
        if (extension_loaded('pcntl') && (defined('AMQP_WITHOUT_SIGNALS') ? !AMQP_WITHOUT_SIGNALS : true)) {
368
            if (!function_exists('pcntl_signal_dispatch')) {
369
                throw new \BadFunctionCallException("Function 'pcntl_signal_dispatch' is referenced in the php.ini 'disable_functions' and can't be called.");
370
            }
371
372
            pcntl_signal_dispatch();
373
        }
374
375
        if ($this->forceStop || ($this->batchAmount == $this->batchAmountTarget && $this->batchAmountTarget > 0)) {
376
            $this->stopConsuming();
377
        }
378
379
        if (null !== $this->getMemoryLimit() && $this->isRamAlmostOverloaded()) {
380
            $this->stopConsuming();
381
        }
382
    }
383
384
    /**
385
     * @param   string  $tag
386
     *
387
     * @return  $this
388
     */
389
    public function setConsumerTag($tag)
390
    {
391
        $this->consumerTag = $tag;
392
393
        return $this;
394
    }
395
396
    /**
397
     * @return  string
398
     */
399
    public function getConsumerTag()
400
    {
401
        return $this->consumerTag;
402
    }
403
404
    /**
405
     * @return  void
406
     */
407
    public function forceStopConsumer()
408
    {
409
        $this->forceStop = true;
410
    }
411
412
    /**
413
     * Sets the qos settings for the current channel
414
     * Consider that prefetchSize and global do not work with rabbitMQ version <= 8.0
415
     *
416
     * @param int $prefetchSize
417
     * @param int $prefetchCount
418
     * @param bool $global
419
     */
420
    public function setQosOptions($prefetchSize = 0, $prefetchCount = 0, $global = false)
421
    {
422
        $this->prefetchCount = $prefetchCount;
423
        $this->getChannel()->basic_qos($prefetchSize, $prefetchCount, $global);
424
    }
425
426
    /**
427
     * @param   int     $idleTimeout
428
     *
429
     * @return  $this
430
     */
431
    public function setIdleTimeout($idleTimeout)
432
    {
433
        $this->idleTimeout = $idleTimeout;
434
435
        return $this;
0 ignored issues
show
Bug Best Practice introduced by
The expression return $this returns the type OldSound\RabbitMqBundle\RabbitMq\BatchConsumer which is incompatible with the return type mandated by OldSound\RabbitMqBundle\...rface::setIdleTimeout() of void.

In the issue above, the returned value is violating the contract defined by the mentioned interface.

Let's take a look at an example:

interface HasName {
    /** @return string */
    public function getName();
}

class Name {
    public $name;
}

class User implements HasName {
    /** @return string|Name */
    public function getName() {
        return new Name('foo'); // This is a violation of the ``HasName`` interface
                                // which only allows a string value to be returned.
    }
}
Loading history...
436
    }
437
438
    /**
439
     * Set exit code to be returned when there is a timeout exception
440
     *
441
     * @param   int     $idleTimeoutExitCode
442
     *
443
     * @return  $this
444
     */
445
    public function setIdleTimeoutExitCode($idleTimeoutExitCode)
446
    {
447
        $this->idleTimeoutExitCode = $idleTimeoutExitCode;
448
449
        return $this;
450
    }
451
452
    /**
453
     * keepAlive
454
     *
455
     * @return $this
456
     */
457
    public function keepAlive()
458
    {
459
        $this->keepAlive = true;
460
461
        return $this;
462
    }
463
464
    /**
465
     * Purge the queue
466
     */
467
    public function purge()
468
    {
469
        $this->getChannel()->queue_purge($this->queueOptions['name'], true);
470
    }
471
472
    /**
473
     * Delete the queue
474
     */
475
    public function delete()
476
    {
477
        $this->getChannel()->queue_delete($this->queueOptions['name'], true);
478
    }
479
480
    /**
481
     * Checks if memory in use is greater or equal than memory allowed for this process
482
     *
483
     * @return boolean
484
     */
485
    protected function isRamAlmostOverloaded()
486
    {
487
        return (memory_get_usage(true) >= ($this->getMemoryLimit() * 1048576));
488
    }
489
490
    /**
491
     * @return  int
492
     */
493
    public function getIdleTimeout()
494
    {
495
        return $this->idleTimeout;
496
    }
497
498
    /**
499
     * Get exit code to be returned when there is a timeout exception
500
     *
501
     * @return  int|null
502
     */
503
    public function getIdleTimeoutExitCode()
504
    {
505
        return $this->idleTimeoutExitCode;
506
    }
507
508
    /**
509
     * Resets the consumed property.
510
     * Use when you want to call start() or consume() multiple times.
511
     */
512
    public function resetConsumed()
513
    {
514
        $this->consumed = 0;
515
    }
516
517
    /**
518
     * @param   int     $timeout
519
     *
520
     * @return  $this
521
     */
522
    public function setTimeoutWait($timeout)
523
    {
524
        $this->timeoutWait = $timeout;
525
526
        return $this;
527
    }
528
529
    /**
530
     * @param   int $amount
531
     *
532
     * @return  $this
533
     */
534
    public function setPrefetchCount($amount)
535
    {
536
        $this->prefetchCount = $amount;
537
538
        return $this;
539
    }
540
541
    /**
542
     * @return int
543
     */
544
    public function getTimeoutWait()
545
    {
546
        return $this->timeoutWait;
547
    }
548
549
    /**
550
     * @return int
551
     */
552
    public function getPrefetchCount()
553
    {
554
        return $this->prefetchCount;
555
    }
556
557
    /**
558
     * Set the memory limit
559
     *
560
     * @param int $memoryLimit
561
     */
562
    public function setMemoryLimit($memoryLimit)
563
    {
564
        $this->memoryLimit = $memoryLimit;
565
    }
566
567
    /**
568
     * Get the memory limit
569
     *
570
     * @return int
571
     */
572
    public function getMemoryLimit()
573
    {
574
        return $this->memoryLimit;
575
    }
576
577
    /**
578
     * Check graceful max execution date time and stop if limit is reached
579
     *
580
     * @return void
581
     */
582
    private function checkGracefulMaxExecutionDateTime()
583
    {
584
        if (!$this->gracefulMaxExecutionDateTime) {
585
            return;
586
        }
587
588
        $now = new \DateTime();
589
590
        if ($this->gracefulMaxExecutionDateTime > $now) {
591
            return;
592
        }
593
594
        $this->forceStopConsumer();
595
    }
596
}
597