Completed
Pull Request — master (#39)
by Aleksandr
06:46
created

Consumer::consume()   C

Complexity

Conditions 14
Paths 17

Size

Total Lines 55
Code Lines 32

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 11
CRAP Score 28.6356

Importance

Changes 6
Bugs 0 Features 1
Metric Value
cc 14
eloc 32
c 6
b 0
f 1
nc 17
nop 1
dl 0
loc 55
ccs 11
cts 19
cp 0.5789
crap 28.6356
rs 6.2666

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
namespace OldSound\RabbitMqBundle\RabbitMq;
4
5
use http\Exception\InvalidArgumentException;
6
use OldSound\RabbitMqBundle\Declarations\QueueConsuming;
7
use OldSound\RabbitMqBundle\Event\AfterProcessingMessageEvent;
0 ignored issues
show
Bug introduced by
The type OldSound\RabbitMqBundle\...rProcessingMessageEvent was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
8
use OldSound\RabbitMqBundle\Event\AfterProcessingMessagesEvent;
9
use OldSound\RabbitMqBundle\Event\AMQPEvent;
0 ignored issues
show
Bug introduced by
The type OldSound\RabbitMqBundle\Event\AMQPEvent was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
10
use OldSound\RabbitMqBundle\Event\BeforeProcessingMessageEvent;
0 ignored issues
show
Bug introduced by
The type OldSound\RabbitMqBundle\...eProcessingMessageEvent was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
11
use OldSound\RabbitMqBundle\Event\BeforeProcessingMessagesEvent;
12
use OldSound\RabbitMqBundle\Event\OnConsumeEvent;
13
use OldSound\RabbitMqBundle\Event\OnIdleEvent;
14
use OldSound\RabbitMqBundle\EventDispatcherAwareTrait;
15
use OldSound\RabbitMqBundle\ExecuteCallbackStrategy\FnMessagesProcessor;
16
use OldSound\RabbitMqBundle\ExecuteCallbackStrategy\MessagesProcessorInterface;
17
use OldSound\RabbitMqBundle\MemoryChecker\MemoryConsumptionChecker;
18
use OldSound\RabbitMqBundle\MemoryChecker\NativeMemoryUsageProvider;
19
use OldSound\RabbitMqBundle\RabbitMq\Exception\RpcResponseException;
20
use OldSound\RabbitMqBundle\Serializer\JsonMessageBodySerializer;
21
use OldSound\RabbitMqBundle\Serializer\MessageBodySerializerInterface;
22
use PhpAmqpLib\Channel\AMQPChannel;
23
use PhpAmqpLib\Exception\AMQPRuntimeException;
24
use PhpAmqpLib\Exception\AMQPTimeoutException;
25
use PhpAmqpLib\Message\AMQPMessage;
26
use Psr\Log\LoggerAwareTrait;
27
use Psr\Log\NullLogger;
28
use OldSound\RabbitMqBundle\ExecuteCallbackStrategy\ExecuteCallbackStrategyInterface;
29
use Symfony\Component\Serializer\SerializerInterface;
30
31
class Consumer
32
{
33
    use LoggerAwareTrait;
34
    use EventDispatcherAwareTrait;
35
36
    /** @var AMQPChannel */
37
    protected $channel;
38
    /** @var QueueConsuming[] */
39
    protected $queueConsumings = [];
40
    /** @var ExecuteCallbackStrategyInterface[] */
41
    protected $executeCallbackStrategies = [];
42
    /** @var MessageBodySerializerInterface */
43
    protected $serializer;
44
45
    /** @var string[] */
46
    protected $consumerTags = [];
47
    /** @var array */
48
    protected $basicProperties = [
49
        'content_type' => 'text/plain',
50 32
        'delivery_mode' => 2
51
    ];
52 32
    /** @var int|null */
53
    protected $target;
54
    /** @var int */
55
    protected $consumed = 0;
56
    /** @var bool */
57
    protected $forceStop = false;
58
    /**
59
     * Importrant! If true - then channel can not be used from somewhere else
60
     * @var bool
61
     */
62
    public $multiAck = false;
63
    /**
64 12
     * @var \DateTime|null DateTime after which the consumer will gracefully exit. "Gracefully" means, that
65
     *      any currently running consumption will not be interrupted.
66 12
     */
67
    public $gracefulMaxExecutionDateTime;
68 12
    /** @var int Exit code used, when consumer is closed by the Graceful Max Execution Timeout feature. */
69
    public $gracefulMaxExecutionTimeoutExitCode = 0;
70 12
    /** @var int|null */
71 10
    public $timeoutWait;
72 10
    /** @var int */
73
    public $idleTimeout = 0;
74
    /** @var int */
75
    public $idleTimeoutExitCode;
76
    /** @var \DateTime|null */
77
    public $lastActivityDateTime;
78 10
79
    public function __construct(AMQPChannel $channel)
80 10
    {
81 10
        $this->channel = $channel;
82
        $this->logger = new NullLogger();
83 2
        $this->serializer = new JsonMessageBodySerializer();
84
    }
85
86 8
    public function getChannel(): AMQPChannel
87
    {
88 8
        return $this->channel;
89 6
    }
90 6
91 2
    public function setSerializer(MessageBodySerializerInterface $serializer)
92
    {
93
        $this->serializer = $serializer;
94 4
    }
95 4
96
    protected function setup(): Consumer
97 4
    {
98 4
        foreach($this->queueConsumings as $index => $queueConsuming) {
99 2
            $this->channel->basic_qos($queueConsuming->qosPrefetchSize, $queueConsuming->qosPrefetchCount, false);
100
101 2
            $consumerTag = $this->channel->basic_consume(
102
                $queueConsuming->queueName,
103
                $queueConsuming->consumerTag ?
104
                    $queueConsuming->consumerTag :
105
                    sprintf("PHPPROCESS_%s_%s_%s", gethostname(), getmypid(), $index),
106
                $queueConsuming->noLocal,
107
                $queueConsuming->noAck,
108 4
                $queueConsuming->exclusive,
109
                $queueConsuming->nowait,
110
                function (AMQPMessage $message) use ($queueConsuming) {
111
                    $this->getExecuteCallbackStrategy($queueConsuming)->consumeCallback($message);
112
                });
113
114
            //$queueConsuming->consumerTag = $consumerTag;
115
            $this->consumerTags[] = $consumerTag;
116
        }
117
118
        return $this;
119
    }
120
121
    public function consumeQueue(QueueConsuming $queueConsuming, ExecuteCallbackStrategyInterface $executeCallbackStrategy): Consumer
122
    {
123
        $this->queueConsumings[] = $queueConsuming;
124
        $executeCallbackStrategy->setMessagesProccessor(new FnMessagesProcessor(
125
            (function (array $messages) use ($queueConsuming) {
126
                $logAmqpContext = ['queue' => $queueConsuming->queueName];
127 32
                if ($this->getExecuteCallbackStrategy($queueConsuming)->canPrecessMultiMessages()) {
128
                    $logAmqpContext['messages'] = $messages;
129 32
                } else {
130 32
                    $logAmqpContext['message'] = $messages[0];
131
                }
132
133 32
                $this->dispatchEvent(BeforeProcessingMessagesEvent::NAME,
134 32
                    new BeforeProcessingMessagesEvent($this, $messages, $queueConsuming)
135 32
                );
136 32
137 32
                try {
138
                    $replies = $this->processMessages($messages, $queueConsuming);
0 ignored issues
show
Unused Code introduced by
The assignment to $replies is dead and can be removed.
Loading history...
139 32
                } catch (Exception\StopConsumerException $e) {
140
                    $this->logger->info('Consumer requested stop', [
141 32
                        'amqp' => $logAmqpContext,
142 32
                        'exception' => $e
143 32
                    ]);
144
145
                    $this->stopConsuming(true);
146
                    return;
147
                } catch (\Throwable $e) {
148
                    $this->logger->error('Throw exception while process messages', [
149
                        'amqp' => $logAmqpContext,
150
                        'exception' => $e
151
                    ]);
152
                    throw $e;
153
                }
154
155
                $this->logger->info('Queue messages processed', ['amqp' => $logAmqpContext]); // TODO add flag code
156
                $this->dispatchEvent(
157
                    AfterProcessingMessagesEvent::NAME,
158
                    new AfterProcessingMessagesEvent($this, $messages) // TODO add flag code
159
                );
160
161
                $this->maybeStopConsumer();
162
            })->bindTo($this)
163
        ));
164
165
        $canPrecessMultiMessages = $executeCallbackStrategy->canPrecessMultiMessages();
166
        if ($canPrecessMultiMessages) {
167
            if (!$queueConsuming->callback instanceof BatchConsumerInterface) {
168
                throw new \InvalidArgumentException('TODO '. $queueConsuming->queueName);
169
            }
170
        } else {
171
            if (!$queueConsuming->callback instanceof ConsumerInterface) {
172
                throw new \InvalidArgumentException('TODO '. $queueConsuming->queueName);
173
            }
174
        }
175 32
176
        $this->executeCallbackStrategies[] = $executeCallbackStrategy;
177 14
178
        return $this;
179 14
    }
180 14
181
    private function getExecuteCallbackStrategy(QueueConsuming $queueConsuming): ExecuteCallbackStrategyInterface
182 32
    {
183
        return $this->executeCallbackStrategies[array_search($queueConsuming, $this->queueConsumings, true)];
184 32
    }
185
186 10
    /**
187 22
     * @return QueueConsuming[]
188
     */
189
    public function getQueueConsumings(): array
190 22
    {
191
        return $this->queueConsumings;
192 5
    }
193 17
194
    /**
195 15
     * Consume the message
196
     * @param   int     $msgAmount
197
     * @return  int
198 32
     *
199 32
     * @throws  AMQPTimeoutException
200
     */
201 32
    public function consume(int $msgAmount = null)
202
    {
203
        $this->target = $msgAmount;
204 32
        $this->consumed = 0;
205
        
206
        $this->setup();
207
        
208
        $this->lastActivityDateTime = new \DateTime();
209
        while ($this->channel->is_consuming()) {
210
            $this->dispatchEvent(OnConsumeEvent::NAME, new OnConsumeEvent($this));
211
            $this->maybeStopConsumer();
212
213
            if ($this->forceStop) {
214
                break;
215
            }
216
            /*
217
             * Be careful not to trigger ::wait() with 0 or less seconds, when
218
             * graceful max execution timeout is being used.
219
             */
220
            $waitTimeout = $this->chooseWaitTimeout();
221 4
            if ($this->gracefulMaxExecutionDateTime && $waitTimeout < 1) {
222
                return $this->gracefulMaxExecutionTimeoutExitCode;
223 4
            }
224 4
225
            try {
226
                $this->channel->wait(null, false, $waitTimeout);
227
                $this->lastActivityDateTime = new \DateTime();
228
                if ($this->forceStop) {
229 4
                    break;
230
                }
231 4
            } catch (AMQPTimeoutException $e) {
232 4
                foreach($this->executeCallbackStrategies as $executeCallbackStrategy) {
233
                    $executeCallbackStrategy->onCatchTimeout($e);
234
                }
235
                $now = new \DateTime();
236
                if ($this->gracefulMaxExecutionDateTime && $this->gracefulMaxExecutionDateTime <= $now) {
237 2
                    return $this->gracefulMaxExecutionTimeoutExitCode;
238
                }
239 2
240 2
                if ($this->idleTimeout && ($this->lastActivityDateTime->getTimestamp() + $this->idleTimeout <= $now->getTimestamp())) {
241
                    $idleEvent = new OnIdleEvent($this);
242
                    $this->dispatchEvent(OnIdleEvent::NAME, $idleEvent);
243
244
                    if ($idleEvent->isForceStop()) {
245
                        if (null !== $this->idleTimeoutExitCode) {
246
                            return $this->idleTimeoutExitCode;
247
                        } else {
248
                            throw $e;
249
                        }
250
                    }
251
                }
252
            }
253
        }
254
255
        return 0;
256
    }
257
258
    /**
259
     * @param AMQPMessage[] $messages
260
     * @param QueueConsuming $queueConsuming
261
     */
262
    protected function processMessages(array $messages, QueueConsuming $queueConsuming)
263
    {
264
        if (count($messages) === 0) {
265
            throw new \InvalidArgumentException('Messages can not be empty');
266
        }
267 10
268
        $canPrecessMultiMessages = $this->getExecuteCallbackStrategy($queueConsuming)->canPrecessMultiMessages();
269 10
        if (!$canPrecessMultiMessages && count($messages) !== 1) {
270 4
            throw new \InvalidArgumentException('Strategy is not supported process of multi messages');
271 4
        }
272 4
273 4
        /** @var int[]|RpcReponse[]|RpcResponseException[]|bool[] $replies */
274 4
        $replies = [];
275
        if ($queueConsuming->callback instanceof BatchConsumerInterface) {
276 4
            $replies = $queueConsuming->callback->batchExecute($messages);
277 2
            if (!is_array($replies)) {
278
                $processFlag = $replies;
279
                $replies = [];
280
                foreach ($messages as $message) {
281
                    $replies[$message->getDeliveryTag()] = $processFlag;
282
                }
283
            } else if (count($replies) !== count($messages)) {
284
                throw new AMQPRuntimeException(
285 4
                    'Method batchExecute() should return an array with elements equal with the number of messages processed'
286 4
                );
287
            }
288
        } else {
289
            try {
290
                $replies = [$messages[0]->getDeliveryTag() => $queueConsuming->callback->execute($messages[0])];
291
            } catch (Exception\RpcResponseException $e) {
292
                $replies = [$messages[0]->getDeliveryTag() => $e];
293
            }
294
        }
295 4
296 4
        if (!$queueConsuming->noAck) {
297
            $messages = array_combine(
298
                array_map(fn ($message) => $message->getDeliveryTag(), $messages),
299
                $messages
300
            );
301 6
302 6
            $this->handleProcessMessages($messages, $replies, $queueConsuming);
0 ignored issues
show
Bug introduced by
It seems like $messages can also be of type false; however, parameter $messages of OldSound\RabbitMqBundle\...handleProcessMessages() does only seem to accept PhpAmqpLib\Message\AMQPMessage[], maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

302
            $this->handleProcessMessages(/** @scrutinizer ignore-type */ $messages, $replies, $queueConsuming);
Loading history...
303
        }
304
305
        return $replies;
306
    }
307
308
309
    /**
310
     * @param AMQPMessage[] $messages
311
     * @param int[]|RpcReponse[]|RpcResponseException[]|bool[] $replies
312
     */
313
    private function handleProcessMessages($messages, array $replies, QueueConsuming $queueConsuming)
314
    {
315
        $executeCallbackStrategy = $this->getExecuteCallbackStrategy($queueConsuming);
316
317
        $ack = !array_search(fn ($reply) => $reply !== null && $reply !== ConsumerInterface::MSG_ACK, $replies, true);
318
        if ($this->multiAck && count($messages) > 1 && $ack) {
319
            $channels = array_map(fn ($message) => $message->getChannel(), $messages);
320
            if (count($channels) !== array_unique($channels)) { // all messages have same channel
321
                throw new InvalidArgumentException('Messages can not be processed as multi ack with different channels');
322
            }
323
324
            $this->channel->basic_ack(last($deliveryTag), true);
0 ignored issues
show
Bug introduced by
The function last was not found. Maybe you did not declare it correctly or list all dependencies? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

324
            $this->channel->basic_ack(/** @scrutinizer ignore-call */ last($deliveryTag), true);
Loading history...
Comprehensibility Best Practice introduced by
The variable $deliveryTag seems to be never defined.
Loading history...
325
            $this->consumed = $this->consumed + count($messages);
326
            $executeCallbackStrategy->onMessageProcessed($message);
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable $message does not exist. Did you maybe mean $messages?
Loading history...
327
328
            return array_combine(
329
                array_map(fn ($message) => $message->getDeliveryTag(), $messages),
330
                array_fill(0, count($messages), ConsumerInterface::MSG_ACK)
331
            );
332
        } else {
333
            foreach ($replies as $deliveryTag => $reply) {
334
                $message = $messages[$deliveryTag] ?? null;
335
                if (null === $message) {
336
                    throw new AMQPRuntimeException(sprintf('Unknown delivery_tag %d!', $deliveryTag));
337
                }
338
339
                $channel = $message->getChannel();
340
                $processFlag = $reply;
341
                if ($processFlag === ConsumerInterface::MSG_REJECT_REQUEUE || false === $processFlag) {
342
                    $channel->basic_reject($deliveryTag, true); // Reject and requeue message to RabbitMQ
343
                } else if ($processFlag === ConsumerInterface::MSG_SINGLE_NACK_REQUEUE) {
344
                    $channel->basic_nack($deliveryTag, false, true); // NACK and requeue message to RabbitMQ
345
                } else if ($processFlag === ConsumerInterface::MSG_REJECT) {
346
                    $channel->basic_reject($deliveryTag, false); // Reject and drop
347
                } else if ($processFlag !== ConsumerInterface::MSG_ACK_SENT) {
348
                    $isRpcCall = $message->has('reply_to') && $message->has('correlation_id');
349
                    if ($isRpcCall) {
350
                        $this->sendRpcReply($message, $reply);
351
                    }
352
353
                    $channel->basic_ack($deliveryTag); // Remove message from queue only if callback return not false
354
                }
355
356
                $this->consumed++;
357
358
                $executeCallbackStrategy->onMessageProcessed($message);
359
            }
360
        }
361
    }
362
363
    protected function sendRpcReply(AMQPMessage $message, $result)
364
    {
365
        if ($result instanceof RpcReponse || $result instanceof RpcResponseException) {
366
            $body = $this->serializer->serialize($result);
367
            $replayMessage = new AMQPMessage($body, [
368
                'content_type' => 'text/plain',
369
                'correlation_id' => $message->get('correlation_id'),
370
            ]);
371
            $message->getChannel()->basic_publish($replayMessage , '', $message->get('reply_to'));
0 ignored issues
show
Bug introduced by
It seems like $message->get('reply_to') can also be of type PhpAmqpLib\Channel\AMQPChannel; however, parameter $routing_key of PhpAmqpLib\Channel\AMQPChannel::basic_publish() does only seem to accept string, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

371
            $message->getChannel()->basic_publish($replayMessage , '', /** @scrutinizer ignore-type */ $message->get('reply_to'));
Loading history...
372
        } else {
373
            $this->logger->error('Rpc call send msg to queue which have not rpc reponse', [
374
                'amqp' => ['message' => $message]
375
            ]);
376
        }
377
    }
378
379
    protected function maybeStopConsumer()
380
    {
381
        if ($this->forceStop || ($this->target && $this->consumed == $this->target)) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $this->target of type integer|null is loosely compared to true; this is ambiguous if the integer can be 0. You might want to explicitly use !== null instead.

In PHP, under loose comparison (like ==, or !=, or switch conditions), values of different types might be equal.

For integer values, zero is a special case, in particular the following results might be unexpected:

0   == false // true
0   == null  // true
123 == false // false
123 == null  // false

// It is often better to use strict comparison
0 === false // false
0 === null  // false
Loading history...
382
            $this->stopConsuming();
383
        }
384
    }
385
386
    public function forceStopConsumer()
387
    {
388
        $this->forceStop = true;
389
    }
390
391
    public function stopConsuming($immedietly = false)
392
    {
393
        if (false === $immedietly) {
394
            foreach ($this->executeCallbackStrategies as $executeCallbackStrategy) {
395
                $executeCallbackStrategy->onStopConsuming();
396
            }
397
        }
398
399
        foreach ($this->consumerTags as $consumerTag) {
400
            $this->channel->basic_cancel($consumerTag, false, true);
401
        }
402
403
        $this->consumerTags = [];
404
    }
405
406
    /**
407
     * @param int $secondsInTheFuture
408
     */
409
    public function setGracefulMaxExecutionDateTimeFromSecondsInTheFuture($secondsInTheFuture)
410
    {
411
        $this->gracefulMaxExecutionDateTime = new \DateTime("+{$secondsInTheFuture} seconds");
412
    }
413
414
    /**
415
     * Choose the timeout wait (in seconds) to use for the $this->getChannel()->wait() method.
416
     */
417
    private function chooseWaitTimeout(): int
418
    {
419
        if ($this->gracefulMaxExecutionDateTime) {
420
            $allowedExecutionSeconds = $this->gracefulMaxExecutionDateTime->getTimestamp() - time();
421
422
            /*
423
             * Respect the idle timeout if it's set and if it's less than
424
             * the remaining allowed execution.
425
             */
426
            $waitTimeout = $this->idleTimeout && $this->idleTimeout < $allowedExecutionSeconds
427
                ? $this->idleTimeout
428
                : $allowedExecutionSeconds;
429
        } else {
430
            $waitTimeout = $this->idleTimeout;
431
        }
432
433
        if (!is_null($this->timeoutWait) && $this->timeoutWait > 0) {
434
            $waitTimeout = min($waitTimeout, $this->timeoutWait);
435
        }
436
        return $waitTimeout;
437
    }
438
}
439