Test Failed
Pull Request — master (#39)
by Aleksandr
06:37
created

Consumer::consumeQueue()   B

Complexity

Conditions 9
Paths 12

Size

Total Lines 64
Code Lines 42

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 21
CRAP Score 18.4017

Importance

Changes 5
Bugs 0 Features 1
Metric Value
cc 9
eloc 42
c 5
b 0
f 1
nc 12
nop 2
dl 0
loc 64
ccs 21
cts 41
cp 0.5122
crap 18.4017
rs 7.6924

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
namespace OldSound\RabbitMqBundle\RabbitMq;
4
5
use http\Exception\InvalidArgumentException;
6
use OldSound\RabbitMqBundle\Declarations\QueueConsuming;
7
use OldSound\RabbitMqBundle\Event\AfterProcessingMessageEvent;
0 ignored issues
show
Bug introduced by
The type OldSound\RabbitMqBundle\...rProcessingMessageEvent was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
8
use OldSound\RabbitMqBundle\Event\AfterProcessingMessagesEvent;
9
use OldSound\RabbitMqBundle\Event\AMQPEvent;
0 ignored issues
show
Bug introduced by
The type OldSound\RabbitMqBundle\Event\AMQPEvent was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
10
use OldSound\RabbitMqBundle\Event\BeforeProcessingMessageEvent;
0 ignored issues
show
Bug introduced by
The type OldSound\RabbitMqBundle\...eProcessingMessageEvent was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
11
use OldSound\RabbitMqBundle\Event\BeforeProcessingMessagesEvent;
12
use OldSound\RabbitMqBundle\Event\OnConsumeEvent;
13
use OldSound\RabbitMqBundle\Event\OnIdleEvent;
14
use OldSound\RabbitMqBundle\EventDispatcherAwareTrait;
15
use OldSound\RabbitMqBundle\ExecuteCallbackStrategy\BatchExecuteCallbackStrategy;
16
use OldSound\RabbitMqBundle\ExecuteCallbackStrategy\FnMessagesProcessor;
17
use OldSound\RabbitMqBundle\ExecuteCallbackStrategy\MessagesProcessorInterface;
18
use OldSound\RabbitMqBundle\ExecuteCallbackStrategy\SimpleExecuteCallbackStrategy;
19
use OldSound\RabbitMqBundle\MemoryChecker\MemoryConsumptionChecker;
20
use OldSound\RabbitMqBundle\MemoryChecker\NativeMemoryUsageProvider;
21
use OldSound\RabbitMqBundle\RabbitMq\Exception\RpcResponseException;
22
use OldSound\RabbitMqBundle\Serializer\JsonMessageBodySerializer;
23
use OldSound\RabbitMqBundle\Serializer\MessageBodySerializerInterface;
24
use PhpAmqpLib\Channel\AMQPChannel;
25
use PhpAmqpLib\Exception\AMQPRuntimeException;
26
use PhpAmqpLib\Exception\AMQPTimeoutException;
27
use PhpAmqpLib\Message\AMQPMessage;
28
use Psr\Log\LoggerAwareTrait;
29
use Psr\Log\NullLogger;
30
use OldSound\RabbitMqBundle\ExecuteCallbackStrategy\ExecuteCallbackStrategyInterface;
31
use Symfony\Component\Serializer\SerializerInterface;
32
33
class Consumer
34
{
35
    use LoggerAwareTrait;
36
    use EventDispatcherAwareTrait;
37
38
    /** @var AMQPChannel */
39
    protected $channel;
40
    /** @var QueueConsuming[] */
41
    protected $queueConsumings = [];
42
    /** @var ExecuteCallbackStrategyInterface[] */
43
    protected $executeCallbackStrategies = [];
44
    /** @var MessageBodySerializerInterface */
45
    protected $serializer;
46
47
    /** @var string[] */
48
    protected $consumerTags = [];
49
    /** @var array */
50 32
    protected $basicProperties = [
51
        'content_type' => 'text/plain',
52 32
        'delivery_mode' => 2
53
    ];
54
    /** @var int|null */
55
    protected $target;
56
    /** @var int */
57
    protected $consumed = 0;
58
    /** @var bool */
59
    protected $forceStop = false;
60
    /**
61
     * Importrant! If true - then channel can not be used from somewhere else
62
     * @var bool
63
     */
64 12
    public $multiAck = false;
65
    /**
66 12
     * @var \DateTime|null DateTime after which the consumer will gracefully exit. "Gracefully" means, that
67
     *      any currently running consumption will not be interrupted.
68 12
     */
69
    public $gracefulMaxExecutionDateTime;
70 12
    /** @var int Exit code used, when consumer is closed by the Graceful Max Execution Timeout feature. */
71 10
    public $gracefulMaxExecutionTimeoutExitCode = 0;
72 10
    /** @var int|null */
73
    public $timeoutWait;
74
    /** @var int */
75
    public $idleTimeout = 0;
76
    /** @var int */
77
    public $idleTimeoutExitCode;
78 10
    /** @var \DateTime|null */
79
    public $lastActivityDateTime;
80 10
81 10
    public function __construct(AMQPChannel $channel)
82
    {
83 2
        $this->channel = $channel;
84
        $this->logger = new NullLogger();
85
        $this->serializer = new JsonMessageBodySerializer();
86 8
    }
87
88 8
    public function getChannel(): AMQPChannel
89 6
    {
90 6
        return $this->channel;
91 2
    }
92
93
    public function setSerializer(MessageBodySerializerInterface $serializer)
94 4
    {
95 4
        $this->serializer = $serializer;
96
    }
97 4
98 4
    protected function setup(): Consumer
99 2
    {
100
        foreach($this->queueConsumings as $index => $queueConsuming) {
101 2
            $this->channel->basic_qos($queueConsuming->qosPrefetchSize, $queueConsuming->qosPrefetchCount, false);
102
103
            $consumerTag = $this->channel->basic_consume(
104
                $queueConsuming->queueName,
105
                $queueConsuming->consumerTag ?
106
                    $queueConsuming->consumerTag :
107
                    sprintf("PHPPROCESS_%s_%s_%s", gethostname(), getmypid(), $index),
108 4
                $queueConsuming->noLocal,
109
                $queueConsuming->noAck,
110
                $queueConsuming->exclusive,
111
                $queueConsuming->nowait,
112
                function (AMQPMessage $message) use ($queueConsuming) {
113
                    $this->getExecuteCallbackStrategy($queueConsuming)->consumeCallback($message);
114
                });
115
116
            //$queueConsuming->consumerTag = $consumerTag;
117
            $this->consumerTags[] = $consumerTag;
118
        }
119
120
        return $this;
121
    }
122
123
    /**
124
     * @param iterable|QueueConsuming[] $queueConsumings
125
     */
126
    public function consumeQueues(iterable $queueConsumings)
127 32
    {
128
        foreach ($queueConsumings as $queueConsuming) {
129 32
            $this->consumeQueue($queueConsuming);
130 32
        }
131
    }
132
133 32
    public function consumeQueue(QueueConsuming $queueConsuming, ExecuteCallbackStrategyInterface $executeCallbackStrategy = null): Consumer
134 32
    {
135 32
        $this->queueConsumings[] = $queueConsuming;
136 32
        if (null === $executeCallbackStrategy) {
137 32
            $executeCallbackStrategy = null === $queueConsuming->batchCount ?
138
                new SimpleExecuteCallbackStrategy() :
139 32
                new BatchExecuteCallbackStrategy($queueConsuming->batchCount);
140
        }
141 32
142 32
        $executeCallbackStrategy->setMessagesProccessor(new FnMessagesProcessor(
143 32
            (function (array $messages) use ($queueConsuming) {
144
                $logAmqpContext = ['queue' => $queueConsuming->queueName];
145
                if ($this->getExecuteCallbackStrategy($queueConsuming)->canPrecessMultiMessages()) {
146
                    $logAmqpContext['messages'] = $messages;
147
                } else {
148
                    $logAmqpContext['message'] = $messages[0];
149
                }
150
151
                $this->dispatchEvent(BeforeProcessingMessagesEvent::NAME,
152
                    new BeforeProcessingMessagesEvent($this, $messages, $queueConsuming)
153
                );
154
155
                try {
156
                    $replies = $this->processMessages($messages, $queueConsuming);
0 ignored issues
show
Unused Code introduced by
The assignment to $replies is dead and can be removed.
Loading history...
157
                } catch (Exception\StopConsumerException $e) {
158
                    $this->logger->info('Consumer requested stop', [
159
                        'amqp' => $logAmqpContext,
160
                        'exception' => $e
161
                    ]);
162
163
                    $this->stopConsuming(true);
164
                    return;
165
                } catch (\Throwable $e) {
166
                    $this->logger->error('Throw exception while process messages', [
167
                        'amqp' => $logAmqpContext,
168
                        'exception' => $e
169
                    ]);
170
                    throw $e;
171
                }
172
173
                $this->logger->info('Queue messages processed', ['amqp' => $logAmqpContext]); // TODO add flag code
174
                $this->dispatchEvent(
175 32
                    AfterProcessingMessagesEvent::NAME,
176
                    new AfterProcessingMessagesEvent($this, $messages) // TODO add flag code
177 14
                );
178
179 14
                $this->maybeStopConsumer();
180 14
            })->bindTo($this)
181
        ));
182 32
183
        $canPrecessMultiMessages = $executeCallbackStrategy->canPrecessMultiMessages();
184 32
        if ($canPrecessMultiMessages) {
185
            if (!$queueConsuming->callback instanceof BatchConsumerInterface) {
186 10
                throw new \InvalidArgumentException('TODO '. $queueConsuming->queueName);
187 22
            }
188
        } else {
189
            if (!$queueConsuming->callback instanceof ConsumerInterface) {
190 22
                throw new \InvalidArgumentException('TODO '. $queueConsuming->queueName);
191
            }
192 5
        }
193 17
194
        $this->executeCallbackStrategies[] = $executeCallbackStrategy;
195 15
196
        return $this;
197
    }
198 32
199 32
    private function getExecuteCallbackStrategy(QueueConsuming $queueConsuming): ExecuteCallbackStrategyInterface
200
    {
201 32
        return $this->executeCallbackStrategies[array_search($queueConsuming, $this->queueConsumings, true)];
202
    }
203
204 32
    /**
205
     * @return QueueConsuming[]
206
     */
207
    public function getQueueConsumings(): array
208
    {
209
        return $this->queueConsumings;
210
    }
211
212
    /**
213
     * Consume the message
214
     * @param   int     $msgAmount
215
     * @return  int
216
     *
217
     * @throws  AMQPTimeoutException
218
     */
219
    public function consume(int $msgAmount = null)
220
    {
221 4
        $this->target = $msgAmount;
222
        $this->consumed = 0;
223 4
        
224 4
        $this->setup();
225
        
226
        $this->lastActivityDateTime = new \DateTime();
227
        while ($this->channel->is_consuming()) {
228
            $this->dispatchEvent(OnConsumeEvent::NAME, new OnConsumeEvent($this));
229 4
            $this->maybeStopConsumer();
230
231 4
            if ($this->forceStop) {
232 4
                break;
233
            }
234
            /*
235
             * Be careful not to trigger ::wait() with 0 or less seconds, when
236
             * graceful max execution timeout is being used.
237 2
             */
238
            $waitTimeout = $this->chooseWaitTimeout();
239 2
            if ($this->gracefulMaxExecutionDateTime && $waitTimeout < 1) {
240 2
                return $this->gracefulMaxExecutionTimeoutExitCode;
241
            }
242
243
            try {
244
                $this->channel->wait(null, false, $waitTimeout);
245
                $this->lastActivityDateTime = new \DateTime();
246
                if ($this->forceStop) {
247
                    break;
248
                }
249
            } catch (AMQPTimeoutException $e) {
250
                foreach($this->executeCallbackStrategies as $executeCallbackStrategy) {
251
                    $executeCallbackStrategy->onCatchTimeout($e);
252
                }
253
                $now = new \DateTime();
254
                if ($this->gracefulMaxExecutionDateTime && $this->gracefulMaxExecutionDateTime <= $now) {
255
                    return $this->gracefulMaxExecutionTimeoutExitCode;
256
                }
257
258
                if ($this->idleTimeout && ($this->lastActivityDateTime->getTimestamp() + $this->idleTimeout <= $now->getTimestamp())) {
259
                    $idleEvent = new OnIdleEvent($this);
260
                    $this->dispatchEvent(OnIdleEvent::NAME, $idleEvent);
261
262
                    if ($idleEvent->isForceStop()) {
263
                        if (null !== $this->idleTimeoutExitCode) {
264
                            return $this->idleTimeoutExitCode;
265
                        } else {
266
                            throw $e;
267 10
                        }
268
                    }
269 10
                }
270 4
            }
271 4
        }
272 4
273 4
        return 0;
274 4
    }
275
276 4
    /**
277 2
     * @param AMQPMessage[] $messages
278
     * @param QueueConsuming $queueConsuming
279
     */
280
    protected function processMessages(array $messages, QueueConsuming $queueConsuming)
281
    {
282
        if (count($messages) === 0) {
283
            throw new \InvalidArgumentException('Messages can not be empty');
284
        }
285 4
286 4
        $canPrecessMultiMessages = $this->getExecuteCallbackStrategy($queueConsuming)->canPrecessMultiMessages();
287
        if (!$canPrecessMultiMessages && count($messages) !== 1) {
288
            throw new \InvalidArgumentException('Strategy is not supported process of multi messages');
289
        }
290
291
        /** @var int[]|RpcReponse[]|RpcResponseException[]|bool[] $replies */
292
        $replies = [];
293
        if ($queueConsuming->callback instanceof BatchConsumerInterface) {
294
            $replies = $queueConsuming->callback->batchExecute($messages);
295 4
            if (!is_array($replies)) {
296 4
                $processFlag = $replies;
297
                $replies = [];
298
                foreach ($messages as $message) {
299
                    $replies[$message->getDeliveryTag()] = $processFlag;
300
                }
301 6
            } else if (count($replies) !== count($messages)) {
302 6
                throw new AMQPRuntimeException(
303
                    'Method batchExecute() should return an array with elements equal with the number of messages processed'
304
                );
305
            }
306
        } else {
307
            try {
308
                $replies = [$messages[0]->getDeliveryTag() => $queueConsuming->callback->execute($messages[0])];
309
            } catch (Exception\RpcResponseException $e) {
310
                $replies = [$messages[0]->getDeliveryTag() => $e];
311
            }
312
        }
313
314
        if (!$queueConsuming->noAck) {
315
            $messages = array_combine(
316
                array_map(fn ($message) => $message->getDeliveryTag(), $messages),
317
                $messages
318
            );
319
320
            $this->handleProcessMessages($messages, $replies, $queueConsuming);
0 ignored issues
show
Bug introduced by
It seems like $messages can also be of type false; however, parameter $messages of OldSound\RabbitMqBundle\...handleProcessMessages() does only seem to accept PhpAmqpLib\Message\AMQPMessage[], maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

320
            $this->handleProcessMessages(/** @scrutinizer ignore-type */ $messages, $replies, $queueConsuming);
Loading history...
321
        }
322
323
        return $replies;
324
    }
325
326
327
    /**
328
     * @param AMQPMessage[] $messages
329
     * @param int[]|RpcReponse[]|RpcResponseException[]|bool[] $replies
330
     */
331
    private function handleProcessMessages($messages, array $replies, QueueConsuming $queueConsuming)
332
    {
333
        $executeCallbackStrategy = $this->getExecuteCallbackStrategy($queueConsuming);
334
335
        $ack = !array_search(fn ($reply) => $reply !== null && $reply !== ConsumerInterface::MSG_ACK, $replies, true);
336
        if ($this->multiAck && count($messages) > 1 && $ack) {
337
            $channels = array_map(fn ($message) => $message->getChannel(), $messages);
338
            if (count($channels) !== array_unique($channels)) { // all messages have same channel
339
                throw new InvalidArgumentException('Messages can not be processed as multi ack with different channels');
340
            }
341
342
            $this->channel->basic_ack(last($deliveryTag), true);
0 ignored issues
show
Bug introduced by
The function last was not found. Maybe you did not declare it correctly or list all dependencies? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

342
            $this->channel->basic_ack(/** @scrutinizer ignore-call */ last($deliveryTag), true);
Loading history...
Comprehensibility Best Practice introduced by
The variable $deliveryTag seems to be never defined.
Loading history...
343
            $this->consumed = $this->consumed + count($messages);
344
            $executeCallbackStrategy->onMessageProcessed($message);
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable $message does not exist. Did you maybe mean $messages?
Loading history...
345
346
            return array_combine(
347
                array_map(fn ($message) => $message->getDeliveryTag(), $messages),
348
                array_fill(0, count($messages), ConsumerInterface::MSG_ACK)
349
            );
350
        } else {
351
            foreach ($replies as $deliveryTag => $reply) {
352
                $message = $messages[$deliveryTag] ?? null;
353
                if (null === $message) {
354
                    throw new AMQPRuntimeException(sprintf('Unknown delivery_tag %d!', $deliveryTag));
355
                }
356
357
                $channel = $message->getChannel();
358
                $processFlag = $reply;
359
                if ($processFlag === ConsumerInterface::MSG_REJECT_REQUEUE || false === $processFlag) {
360
                    $channel->basic_reject($deliveryTag, true); // Reject and requeue message to RabbitMQ
361
                } else if ($processFlag === ConsumerInterface::MSG_SINGLE_NACK_REQUEUE) {
362
                    $channel->basic_nack($deliveryTag, false, true); // NACK and requeue message to RabbitMQ
363
                } else if ($processFlag === ConsumerInterface::MSG_REJECT) {
364
                    $channel->basic_reject($deliveryTag, false); // Reject and drop
365
                } else if ($processFlag !== ConsumerInterface::MSG_ACK_SENT) {
366
                    $isRpcCall = $message->has('reply_to') && $message->has('correlation_id');
367
                    if ($isRpcCall) {
368
                        $this->sendRpcReply($message, $reply);
369
                    }
370
371
                    $channel->basic_ack($deliveryTag); // Remove message from queue only if callback return not false
372
                }
373
374
                $this->consumed++;
375
376
                $executeCallbackStrategy->onMessageProcessed($message);
377
            }
378
        }
379
    }
380
381
    protected function sendRpcReply(AMQPMessage $message, $result)
382
    {
383
        if ($result instanceof RpcReponse || $result instanceof RpcResponseException) {
384
            $body = $this->serializer->serialize($result);
385
            $replayMessage = new AMQPMessage($body, [
386
                'content_type' => 'text/plain',
387
                'correlation_id' => $message->get('correlation_id'),
388
            ]);
389
            $message->getChannel()->basic_publish($replayMessage , '', $message->get('reply_to'));
0 ignored issues
show
Bug introduced by
It seems like $message->get('reply_to') can also be of type PhpAmqpLib\Channel\AMQPChannel; however, parameter $routing_key of PhpAmqpLib\Channel\AMQPChannel::basic_publish() does only seem to accept string, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

389
            $message->getChannel()->basic_publish($replayMessage , '', /** @scrutinizer ignore-type */ $message->get('reply_to'));
Loading history...
390
        } else {
391
            $this->logger->error('Rpc call send msg to queue which have not rpc reponse', [
392
                'amqp' => ['message' => $message]
393
            ]);
394
        }
395
    }
396
397
    protected function maybeStopConsumer()
398
    {
399
        if ($this->forceStop || ($this->target && $this->consumed == $this->target)) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $this->target of type integer|null is loosely compared to true; this is ambiguous if the integer can be 0. You might want to explicitly use !== null instead.

In PHP, under loose comparison (like ==, or !=, or switch conditions), values of different types might be equal.

For integer values, zero is a special case, in particular the following results might be unexpected:

0   == false // true
0   == null  // true
123 == false // false
123 == null  // false

// It is often better to use strict comparison
0 === false // false
0 === null  // false
Loading history...
400
            $this->stopConsuming();
401
        }
402
    }
403
404
    public function forceStopConsumer()
405
    {
406
        $this->forceStop = true;
407
    }
408
409
    public function stopConsuming($immedietly = false)
410
    {
411
        if (false === $immedietly) {
412
            foreach ($this->executeCallbackStrategies as $executeCallbackStrategy) {
413
                $executeCallbackStrategy->onStopConsuming();
414
            }
415
        }
416
417
        foreach ($this->consumerTags as $consumerTag) {
418
            $this->channel->basic_cancel($consumerTag, false, true);
419
        }
420
421
        $this->consumerTags = [];
422
    }
423
424
    /**
425
     * @param int $secondsInTheFuture
426
     */
427
    public function setGracefulMaxExecutionDateTimeFromSecondsInTheFuture($secondsInTheFuture)
428
    {
429
        $this->gracefulMaxExecutionDateTime = new \DateTime("+{$secondsInTheFuture} seconds");
430
    }
431
432
    /**
433
     * Choose the timeout wait (in seconds) to use for the $this->getChannel()->wait() method.
434
     */
435
    private function chooseWaitTimeout(): int
436
    {
437
        if ($this->gracefulMaxExecutionDateTime) {
438
            $allowedExecutionSeconds = $this->gracefulMaxExecutionDateTime->getTimestamp() - time();
439
440
            /*
441
             * Respect the idle timeout if it's set and if it's less than
442
             * the remaining allowed execution.
443
             */
444
            $waitTimeout = $this->idleTimeout && $this->idleTimeout < $allowedExecutionSeconds
445
                ? $this->idleTimeout
446
                : $allowedExecutionSeconds;
447
        } else {
448
            $waitTimeout = $this->idleTimeout;
449
        }
450
451
        if (!is_null($this->timeoutWait) && $this->timeoutWait > 0) {
452
            $waitTimeout = min($waitTimeout, $this->timeoutWait);
453
        }
454
        return $waitTimeout;
455
    }
456
}
457