BatchConsumer::analyzeProcessFlags()   A
last analyzed

Complexity

Conditions 4
Paths 4

Size

Total Lines 18
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 20

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 4
eloc 9
c 1
b 0
f 0
nc 4
nop 1
dl 0
loc 18
ccs 0
cts 9
cp 0
crap 20
rs 9.9666
1
<?php
2
3
namespace OldSound\RabbitMqBundle\RabbitMq;
4
5
use PhpAmqpLib\Channel\AMQPChannel;
6
use PhpAmqpLib\Exception\AMQPRuntimeException;
7
use PhpAmqpLib\Exception\AMQPTimeoutException;
8
use PhpAmqpLib\Message\AMQPMessage;
9
10
class BatchConsumer extends BaseAmqp implements DequeuerInterface
11
{
12
    /**
13
     * @var \Closure|callable
14
     */
15
    protected $callback;
16
17
    /**
18
     * @var bool
19
     */
20
    protected $forceStop = false;
21
22
    /**
23
     * @var int
24
     */
25
    protected $idleTimeout = 0;
26
27
    /**
28
     * @var bool
29
     */
30
    private $keepAlive = false;
31
32
    /**
33
     * @var int
34
     */
35
    protected $idleTimeoutExitCode;
36
37
    /**
38
     * @var int
39
     */
40
    protected $memoryLimit = null;
41
42
    /**
43
     * @var int
44
     */
45
    protected $prefetchCount;
46
47
    /**
48
     * @var int
49
     */
50
    protected $timeoutWait = 3;
51
52
    /**
53
     * @var array
54
     */
55
    protected $messages = [];
56
57
    /**
58
     * @var int
59
     */
60
    protected $batchCounter = 0;
61
62
    /**
63
     * @var int
64
     */
65
    protected $batchAmount = 0;
66
67
    /**
68
     * @var int
69
     */
70
    protected $consumed = 0;
71
72
    /**
73
     * @var \DateTime|null DateTime after which the consumer will gracefully exit. "Gracefully" means, that
74
     *      any currently running consumption will not be interrupted.
75
     */
76
    protected $gracefulMaxExecutionDateTime;
77
78
    /** @var int */
79
    private $batchAmountTarget;
80
81
    /**
82
     * @param \DateTime|null $dateTime
83
     */
84
    public function setGracefulMaxExecutionDateTime(?\DateTime $dateTime = null)
85
    {
86
        $this->gracefulMaxExecutionDateTime = $dateTime;
87
    }
88
89
    /**
90
     * @param int $secondsInTheFuture
91
     */
92
    public function setGracefulMaxExecutionDateTimeFromSecondsInTheFuture($secondsInTheFuture)
93
    {
94
        $this->setGracefulMaxExecutionDateTime(new \DateTime("+{$secondsInTheFuture} seconds"));
95
    }
96
97
    /**
98
     * @param   \Closure|callable    $callback
99
     *
100
     * @return  $this
101
     */
102
    public function setCallback($callback)
103
    {
104
        $this->callback = $callback;
105
106
        return $this;
107
    }
108
109
    public function consume(int $batchAmountTarget = 0)
110
    {
111
        $this->batchAmountTarget = $batchAmountTarget;
112
113
        $this->setupConsumer();
114
115
        while ($this->getChannel()->is_consuming()) {
116
            if ($this->isCompleteBatch()) {
117
                $this->batchConsume();
118
            }
119
120
            $this->checkGracefulMaxExecutionDateTime();
121
            $this->maybeStopConsumer();
122
123
            $timeout = $this->isEmptyBatch() ? $this->getIdleTimeout() : $this->getTimeoutWait();
124
125
            try {
126
                $this->getChannel()->wait(null, false, $timeout);
127
            } catch (AMQPTimeoutException $e) {
128
                if (!$this->isEmptyBatch()) {
129
                    $this->batchConsume();
130
                    $this->maybeStopConsumer();
131
                } elseif ($this->keepAlive === true) {
132
                    continue;
133
                } elseif (null !== $this->getIdleTimeoutExitCode()) {
134
                    return $this->getIdleTimeoutExitCode();
135
                } else {
136
                    throw $e;
137
                }
138
            }
139
        }
140
141
        return 0;
142
    }
143
144
    private function batchConsume()
145
    {
146
        try {
147
            $processFlags = call_user_func($this->callback, $this->messages);
148
            $this->handleProcessMessages($processFlags);
149
            $this->logger->debug('Queue message processed', [
150
                'amqp' => [
151
                    'queue' => $this->queueOptions['name'],
152
                    'messages' => $this->messages,
153
                    'return_codes' => $processFlags,
154
                ],
155
            ]);
156
        } catch (Exception\StopConsumerException $e) {
157
            $this->logger->info('Consumer requested stop', [
158
                'amqp' => [
159
                    'queue' => $this->queueOptions['name'],
160
                    'message' => $this->messages,
161
                    'stacktrace' => $e->getTraceAsString(),
162
                ],
163
            ]);
164
            $this->handleProcessMessages($e->getHandleCode());
165
            $this->resetBatch();
166
            $this->stopConsuming();
167
        } catch (\Exception $e) {
168
            $this->logger->error($e->getMessage(), [
169
                'amqp' => [
170
                    'queue' => $this->queueOptions['name'],
171
                    'message' => $this->messages,
172
                    'stacktrace' => $e->getTraceAsString(),
173
                ],
174
            ]);
175
            $this->resetBatch();
176
            throw $e;
177
        } catch (\Error $e) {
178
            $this->logger->error($e->getMessage(), [
179
                'amqp' => [
180
                    'queue' => $this->queueOptions['name'],
181
                    'message' => $this->messages,
182
                    'stacktrace' => $e->getTraceAsString(),
183
                ],
184
            ]);
185
            $this->resetBatch();
186
            throw $e;
187
        }
188
189
        $this->batchAmount++;
190
        $this->resetBatch();
191
    }
192
193
    /**
194
     * @param   mixed   $processFlags
195
     *
196
     * @return  void
197
     */
198
    protected function handleProcessMessages($processFlags = null)
199
    {
200
        $processFlags = $this->analyzeProcessFlags($processFlags);
201
        foreach ($processFlags as $deliveryTag => $processFlag) {
202
            $this->handleProcessFlag($deliveryTag, $processFlag);
203
        }
204
    }
205
206
    /**
207
     * @param   string|int     $deliveryTag
208
     * @param   mixed          $processFlag
209
     *
210
     * @return  void
211
     */
212
    private function handleProcessFlag($deliveryTag, $processFlag)
213
    {
214
        if ($processFlag === ConsumerInterface::MSG_REJECT_REQUEUE || false === $processFlag) {
215
            // Reject and requeue message to RabbitMQ
216
            $this->getMessageChannel($deliveryTag)->basic_reject($deliveryTag, true);
0 ignored issues
show
Bug introduced by
It seems like $deliveryTag can also be of type string; however, parameter $deliveryTag of OldSound\RabbitMqBundle\...er::getMessageChannel() does only seem to accept integer, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

216
            $this->getMessageChannel(/** @scrutinizer ignore-type */ $deliveryTag)->basic_reject($deliveryTag, true);
Loading history...
Bug introduced by
It seems like $deliveryTag can also be of type string; however, parameter $delivery_tag of PhpAmqpLib\Channel\AMQPChannel::basic_reject() does only seem to accept integer, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

216
            $this->getMessageChannel($deliveryTag)->basic_reject(/** @scrutinizer ignore-type */ $deliveryTag, true);
Loading history...
217
        } elseif ($processFlag === ConsumerInterface::MSG_SINGLE_NACK_REQUEUE) {
218
            // NACK and requeue message to RabbitMQ
219
            $this->getMessageChannel($deliveryTag)->basic_nack($deliveryTag, false, true);
0 ignored issues
show
Bug introduced by
It seems like $deliveryTag can also be of type string; however, parameter $delivery_tag of PhpAmqpLib\Channel\AMQPChannel::basic_nack() does only seem to accept integer, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

219
            $this->getMessageChannel($deliveryTag)->basic_nack(/** @scrutinizer ignore-type */ $deliveryTag, false, true);
Loading history...
220
        } elseif ($processFlag === ConsumerInterface::MSG_REJECT) {
221
            // Reject and drop
222
            $this->getMessageChannel($deliveryTag)->basic_reject($deliveryTag, false);
223
        } elseif ($processFlag === ConsumerInterface::MSG_ACK_SENT) {
224
            // do nothing, ACK should be already sent
225
        } else {
226
            // Remove message from queue only if callback return not false
227
            $this->getMessageChannel($deliveryTag)->basic_ack($deliveryTag);
0 ignored issues
show
Bug introduced by
It seems like $deliveryTag can also be of type string; however, parameter $delivery_tag of PhpAmqpLib\Channel\AMQPChannel::basic_ack() does only seem to accept integer, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

227
            $this->getMessageChannel($deliveryTag)->basic_ack(/** @scrutinizer ignore-type */ $deliveryTag);
Loading history...
228
        }
229
    }
230
231
    /**
232
     * @return  bool
233
     */
234
    protected function isCompleteBatch()
235
    {
236
        return $this->batchCounter === $this->prefetchCount;
237
    }
238
239
    /**
240
     * @return  bool
241
     */
242
    protected function isEmptyBatch()
243
    {
244
        return $this->batchCounter === 0;
245
    }
246
247
    /**
248
     * @param   AMQPMessage     $msg
249
     *
250
     * @return  void
251
     *
252
     * @throws  \Error
253
     * @throws  \Exception
254
     */
255
    public function processMessage(AMQPMessage $msg)
256
    {
257
        $this->addMessage($msg);
258
259
        $this->maybeStopConsumer();
260
    }
261
262
    /**
263
     * @param   mixed   $processFlags
264
     *
265
     * @return  array
266
     */
267
    private function analyzeProcessFlags($processFlags = null)
268
    {
269
        if (is_array($processFlags)) {
270
            if (count($processFlags) !== $this->batchCounter) {
271
                throw new AMQPRuntimeException(
272
                    'Method batchExecute() should return an array with elements equal with the number of messages processed'
273
                );
274
            }
275
276
            return $processFlags;
277
        }
278
279
        $response = [];
280
        foreach ($this->messages as $deliveryTag => $message) {
281
            $response[$deliveryTag] = $processFlags;
282
        }
283
284
        return $response;
285
    }
286
287
288
    /**
289
     * @return  void
290
     */
291
    private function resetBatch()
292
    {
293
        $this->messages = [];
294
        $this->batchCounter = 0;
295
    }
296
297
    /**
298
     * @param   AMQPMessage $message
299
     *
300
     * @return  void
301
     */
302
    private function addMessage(AMQPMessage $message)
303
    {
304
        $this->batchCounter++;
305
        $this->messages[$message->getDeliveryTag()] = $message;
306
    }
307
308
    /**
309
     * @param   int     $deliveryTag
310
     *
311
     * @return  AMQPMessage|null
312
     */
313
    private function getMessage($deliveryTag)
314
    {
315
        return $this->messages[$deliveryTag]
316
            ?? null
317
        ;
318
    }
319
320
    /**
321
     * @param   int     $deliveryTag
322
     *
323
     * @return  AMQPChannel
324
     *
325
     * @throws  AMQPRuntimeException
326
     */
327
    private function getMessageChannel($deliveryTag)
328
    {
329
        $message = $this->getMessage($deliveryTag);
330
        if ($message === null) {
331
            throw new AMQPRuntimeException(sprintf('Unknown delivery_tag %d!', $deliveryTag));
332
        }
333
334
        return $message->getChannel();
335
    }
336
337
    /**
338
     * @return  void
339
     */
340
    public function stopConsuming()
341
    {
342
        if (!$this->isEmptyBatch()) {
343
            $this->batchConsume();
344
        }
345
346
        $this->getChannel()->basic_cancel($this->getConsumerTag(), false, true);
347
    }
348
349
    /**
350
     * @return  void
351
     */
352
    protected function setupConsumer()
353
    {
354
        if ($this->autoSetupFabric) {
355
            $this->setupFabric();
356
        }
357
358
        $this->getChannel()->basic_consume($this->queueOptions['name'], $this->getConsumerTag(), false, $this->consumerOptions['no_ack'], false, false, [$this, 'processMessage']);
359
    }
360
361
    /**
362
     * @return  void
363
     *
364
     * @throws \BadFunctionCallException
365
     */
366
    protected function maybeStopConsumer()
367
    {
368
        if (extension_loaded('pcntl') && (defined('AMQP_WITHOUT_SIGNALS') ? !AMQP_WITHOUT_SIGNALS : true)) {
369
            if (!function_exists('pcntl_signal_dispatch')) {
370
                throw new \BadFunctionCallException("Function 'pcntl_signal_dispatch' is referenced in the php.ini 'disable_functions' and can't be called.");
371
            }
372
373
            pcntl_signal_dispatch();
374
        }
375
376
        if ($this->forceStop || ($this->batchAmount == $this->batchAmountTarget && $this->batchAmountTarget > 0)) {
377
            $this->stopConsuming();
378
        }
379
380
        if (null !== $this->getMemoryLimit() && $this->isRamAlmostOverloaded()) {
381
            $this->stopConsuming();
382
        }
383
    }
384
385
    /**
386
     * @param   string  $tag
387
     *
388
     * @return  $this
389
     */
390
    public function setConsumerTag($tag)
391
    {
392
        $this->consumerTag = $tag;
393
394
        return $this;
395
    }
396
397
    /**
398
     * @return  string
399
     */
400
    public function getConsumerTag()
401
    {
402
        return $this->consumerTag;
403
    }
404
405
    /**
406
     * @return  void
407
     */
408
    public function forceStopConsumer()
409
    {
410
        $this->forceStop = true;
411
    }
412
413
    /**
414
     * Sets the qos settings for the current channel
415
     * Consider that prefetchSize and global do not work with rabbitMQ version <= 8.0
416
     *
417
     * @param int $prefetchSize
418
     * @param int $prefetchCount
419
     * @param bool $global
420
     */
421
    public function setQosOptions($prefetchSize = 0, $prefetchCount = 0, $global = false)
422
    {
423
        $this->prefetchCount = $prefetchCount;
424
        $this->getChannel()->basic_qos($prefetchSize, $prefetchCount, $global);
425
    }
426
427
    /**
428
     * @param   int     $idleTimeout
429
     *
430
     * @return  $this
431
     */
432
    public function setIdleTimeout($idleTimeout)
433
    {
434
        $this->idleTimeout = $idleTimeout;
435
436
        return $this;
0 ignored issues
show
Bug Best Practice introduced by
The expression return $this returns the type OldSound\RabbitMqBundle\RabbitMq\BatchConsumer which is incompatible with the return type mandated by OldSound\RabbitMqBundle\...rface::setIdleTimeout() of void.

In the issue above, the returned value is violating the contract defined by the mentioned interface.

Let's take a look at an example:

interface HasName {
    /** @return string */
    public function getName();
}

class Name {
    public $name;
}

class User implements HasName {
    /** @return string|Name */
    public function getName() {
        return new Name('foo'); // This is a violation of the ``HasName`` interface
                                // which only allows a string value to be returned.
    }
}
Loading history...
437
    }
438
439
    /**
440
     * Set exit code to be returned when there is a timeout exception
441
     *
442
     * @param   int     $idleTimeoutExitCode
443
     *
444
     * @return  $this
445
     */
446
    public function setIdleTimeoutExitCode($idleTimeoutExitCode)
447
    {
448
        $this->idleTimeoutExitCode = $idleTimeoutExitCode;
449
450
        return $this;
451
    }
452
453
    /**
454
     * keepAlive
455
     *
456
     * @return $this
457
     */
458
    public function keepAlive()
459
    {
460
        $this->keepAlive = true;
461
462
        return $this;
463
    }
464
465
    /**
466
     * Purge the queue
467
     */
468
    public function purge()
469
    {
470
        $this->getChannel()->queue_purge($this->queueOptions['name'], true);
471
    }
472
473
    /**
474
     * Delete the queue
475
     */
476
    public function delete()
477
    {
478
        $this->getChannel()->queue_delete($this->queueOptions['name'], true);
479
    }
480
481
    /**
482
     * Checks if memory in use is greater or equal than memory allowed for this process
483
     *
484
     * @return boolean
485
     */
486
    protected function isRamAlmostOverloaded()
487
    {
488
        return (memory_get_usage(true) >= ($this->getMemoryLimit() * 1048576));
489
    }
490
491
    /**
492
     * @return  int
493
     */
494
    public function getIdleTimeout()
495
    {
496
        return $this->idleTimeout;
497
    }
498
499
    /**
500
     * Get exit code to be returned when there is a timeout exception
501
     *
502
     * @return  int|null
503
     */
504
    public function getIdleTimeoutExitCode()
505
    {
506
        return $this->idleTimeoutExitCode;
507
    }
508
509
    /**
510
     * Resets the consumed property.
511
     * Use when you want to call start() or consume() multiple times.
512
     */
513
    public function resetConsumed()
514
    {
515
        $this->consumed = 0;
516
    }
517
518
    /**
519
     * @param   int     $timeout
520
     *
521
     * @return  $this
522
     */
523
    public function setTimeoutWait($timeout)
524
    {
525
        $this->timeoutWait = $timeout;
526
527
        return $this;
528
    }
529
530
    /**
531
     * @param   int $amount
532
     *
533
     * @return  $this
534
     */
535
    public function setPrefetchCount($amount)
536
    {
537
        $this->prefetchCount = $amount;
538
539
        return $this;
540
    }
541
542
    /**
543
     * @return int
544
     */
545
    public function getTimeoutWait()
546
    {
547
        return $this->timeoutWait;
548
    }
549
550
    /**
551
     * @return int
552
     */
553
    public function getPrefetchCount()
554
    {
555
        return $this->prefetchCount;
556
    }
557
558
    /**
559
     * Set the memory limit
560
     *
561
     * @param int $memoryLimit
562
     */
563
    public function setMemoryLimit($memoryLimit)
564
    {
565
        $this->memoryLimit = $memoryLimit;
566
    }
567
568
    /**
569
     * Get the memory limit
570
     *
571
     * @return int
572
     */
573
    public function getMemoryLimit()
574
    {
575
        return $this->memoryLimit;
576
    }
577
578
    /**
579
     * Check graceful max execution date time and stop if limit is reached
580
     *
581
     * @return void
582
     */
583
    private function checkGracefulMaxExecutionDateTime()
584
    {
585
        if (!$this->gracefulMaxExecutionDateTime) {
586
            return;
587
        }
588
589
        $now = new \DateTime();
590
591
        if ($this->gracefulMaxExecutionDateTime > $now) {
592
            return;
593
        }
594
595
        $this->forceStopConsumer();
596
    }
597
}
598