Test Failed
Pull Request — master (#39)
by Aleksandr
02:42
created

Consumer::consume()   C

Complexity

Conditions 12
Paths 14

Size

Total Lines 52
Code Lines 29

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 18
CRAP Score 12.8652

Importance

Changes 5
Bugs 0 Features 1
Metric Value
cc 12
eloc 29
c 5
b 0
f 1
nc 14
nop 1
dl 0
loc 52
ccs 18
cts 22
cp 0.8182
crap 12.8652
rs 6.9666

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
namespace OldSound\RabbitMqBundle\RabbitMq;
4
5
use http\Exception\InvalidArgumentException;
6
use OldSound\RabbitMqBundle\Declarations\QueueConsuming;
7
use OldSound\RabbitMqBundle\Event\AfterProcessingMessageEvent;
8
use OldSound\RabbitMqBundle\Event\AfterProcessingMessagesEvent;
9
use OldSound\RabbitMqBundle\Event\AMQPEvent;
10
use OldSound\RabbitMqBundle\Event\BeforeProcessingMessageEvent;
11
use OldSound\RabbitMqBundle\Event\BeforeProcessingMessagesEvent;
12
use OldSound\RabbitMqBundle\Event\OnConsumeEvent;
13
use OldSound\RabbitMqBundle\Event\OnIdleEvent;
14
use OldSound\RabbitMqBundle\EventDispatcherAwareTrait;
15
use OldSound\RabbitMqBundle\ExecuteReceiverStrategy\BatchExecuteReceiverStrategy;
16
use OldSound\RabbitMqBundle\ExecuteReceiverStrategy\ExecuteReceiverStrategyInterface;
17
use OldSound\RabbitMqBundle\ExecuteReceiverStrategy\FnMessagesProcessor;
18
use OldSound\RabbitMqBundle\ExecuteReceiverStrategy\MessagesProcessorInterface;
19
use OldSound\RabbitMqBundle\ExecuteReceiverStrategy\SimpleExecuteReceiverStrategy;
20
use OldSound\RabbitMqBundle\MemoryChecker\MemoryConsumptionChecker;
21
use OldSound\RabbitMqBundle\MemoryChecker\NativeMemoryUsageProvider;
22
use OldSound\RabbitMqBundle\Producer\ProducerInterface;
23
use OldSound\RabbitMqBundle\Receiver\NotReadyReceiveException;
24
use OldSound\RabbitMqBundle\Receiver\ReceiverException;
25
use OldSound\RabbitMqBundle\Receiver\ReceiverInterface;
26
use OldSound\RabbitMqBundle\Receiver\ReplyReceiverInterface;
27
use OldSound\RabbitMqBundle\Serializer\JsonMessageBodySerializer;
28
use OldSound\RabbitMqBundle\Serializer\MessageBodySerializerInterface;
29
use PhpAmqpLib\Channel\AMQPChannel;
30
use PhpAmqpLib\Exception\AMQPRuntimeException;
31
use PhpAmqpLib\Exception\AMQPTimeoutException;
32
use PhpAmqpLib\Message\AMQPMessage;
33
use Psr\Log\LoggerAwareTrait;
34
use Psr\Log\NullLogger;
35
use OldSound\RabbitMqBundle\ExecuteReceiverStrategy\ExecuteReceiverStrategyInterface;
0 ignored issues
show
Bug introduced by
A parse error occurred: Cannot use OldSound\RabbitMqBundle\ExecuteReceiverStrategy\ExecuteReceiverStrategyInterface as ExecuteReceiverStrategyInterface because the name is already in use
Loading history...
36
use Symfony\Component\Serializer\SerializerInterface;
37
38
class Consumer
39
{
40
    use LoggerAwareTrait;
41
    use EventDispatcherAwareTrait;
42
43
    /** @var AMQPChannel */
44
    protected $channel;
45
    /** @var QueueConsuming[] */
46
    protected $queueConsumings = [];
47
    /** @var ExecuteReceiverStrategyInterface[] */
48
    protected $executeReceiverStrategies = [];
49
    /** @var MessageBodySerializerInterface */
50 32
    protected $serializer;
51
52 32
    /** @var string[] */
53
    protected $consumerTags = [];
54
    /** @var array */
55
    protected $basicProperties = [
56
        'content_type' => 'text/plain',
57
        'delivery_mode' => ProducerInterface::DELIVERY_MODE_PERSISTENT
58
    ];
59
    /** @var int|null */
60
    protected $target;
61
    /** @var int */
62
    protected $consumed = 0;
63
    /** @var bool */
64 12
    protected $forceStop = false;
65
    /**
66 12
     * Importrant! If true - then channel can not be used from somewhere else
67
     * @var bool
68 12
     */
69
    public $multiAck = false;
70 12
    /**
71 10
     * @var \DateTime|null DateTime after which the consumer will gracefully exit. "Gracefully" means, that
72 10
     *      any currently running consumption will not be interrupted.
73
     */
74
    public $gracefulMaxExecutionDateTime;
75
    /** @var int Exit code used, when consumer is closed by the Graceful Max Execution Timeout feature. */
76
    public $gracefulMaxExecutionTimeoutExitCode = 0;
77
    /** @var int|null */
78 10
    public $timeoutWait;
79
    /** @var int */
80 10
    public $idleTimeout = 0;
81 10
    /** @var int */
82
    public $idleTimeoutExitCode;
83 2
    /** @var \DateTime|null */
84
    public $lastActivityDateTime;
85
86 8
    public function __construct(AMQPChannel $channel)
87
    {
88 8
        $this->channel = $channel;
89 6
        $this->logger = new NullLogger();
90 6
        $this->serializer = new JsonMessageBodySerializer();
91 2
    }
92
93
    public function getChannel(): AMQPChannel
94 4
    {
95 4
        return $this->channel;
96
    }
97 4
98 4
    public function setSerializer(MessageBodySerializerInterface $serializer)
99 2
    {
100
        $this->serializer = $serializer;
101 2
    }
102
103
    protected function setup(): Consumer
104
    {
105
        foreach($this->queueConsumings as $index => $queueConsuming) {
106
            $this->channel->basic_qos($queueConsuming->qosPrefetchSize, $queueConsuming->qosPrefetchCount, false);
107
108 4
            $consumerTag = $this->channel->basic_consume(
109
                $queueConsuming->queueName,
110
                $queueConsuming->consumerTag ?
111
                    $queueConsuming->consumerTag :
112
                    sprintf("PHPPROCESS_%s_%s_%s", gethostname(), getmypid(), $index),
113
                $queueConsuming->noLocal,
114
                $queueConsuming->noAck,
115
                $queueConsuming->exclusive,
116
                $queueConsuming->nowait,
117
                function (AMQPMessage $message) use ($queueConsuming) {
118
                    $this->getExecuteReceiverStrategy($queueConsuming)->consumeCallback($message);
119
                });
120
121
            //$queueConsuming->consumerTag = $consumerTag;
122
            $this->consumerTags[] = $consumerTag;
123
        }
124
125
        return $this;
126
    }
127 32
128
    /**
129 32
     * @param iterable|QueueConsuming[] $queueConsumings
130 32
     */
131
    public function consumeQueues(iterable $queueConsumings)
132
    {
133 32
        foreach ($queueConsumings as $queueConsuming) {
134 32
            $this->consumeQueue($queueConsuming);
135 32
        }
136 32
    }
137 32
138
    public function consumeQueue(QueueConsuming $queueConsuming, ExecuteReceiverStrategyInterface $executeReceiverStrategy = null): Consumer
139 32
    {
140
        $this->queueConsumings[] = $queueConsuming;
141 32
        if (null === $executeReceiverStrategy) {
142 32
            $executeReceiverStrategy = null === $queueConsuming->batchCount ?
143 32
                new SimpleExecuteReceiverStrategy() :
144
                new BatchExecuteReceiverStrategy($queueConsuming->batchCount);
145
        }
146
147
        $executeReceiverStrategy->setMessagesProccessor(new FnMessagesProcessor(
148
            (function (array $messages) use ($queueConsuming) {
149
                $logAmqpContext = ['queue' => $queueConsuming->queueName];
150
                if ($this->getExecuteReceiverStrategy($queueConsuming)->canPrecessMultiMessages()) {
151
                    $logAmqpContext['messages'] = $messages;
152
                } else {
153
                    $logAmqpContext['message'] = $messages[0];
154
                }
155
156
                $this->dispatchEvent(BeforeProcessingMessagesEvent::NAME,
157
                    new BeforeProcessingMessagesEvent($this, $messages, $queueConsuming)
158
                );
159
160
                try {
161
                    $this->processMessages($messages, $queueConsuming);
162
                } catch (Exception\StopConsumerException $e) {
163
                    $this->logger->info('Consumer requested stop', [
164
                        'amqp' => $logAmqpContext,
165
                        'exception' => $e
166
                    ]);
167
168
                    $this->stopConsuming(true);
169
                    return;
170
                } catch (\Throwable $e) {
171
                    $this->logger->error('Throw exception while process messages', [
172
                        'amqp' => $logAmqpContext,
173
                        'exception' => $e
174
                    ]);
175 32
                    throw $e;
176
                }
177 14
178
                $this->logger->info('Queue messages processed', ['amqp' => $logAmqpContext]); // TODO add flag code
179 14
                $this->dispatchEvent(
180 14
                    AfterProcessingMessagesEvent::NAME,
181
                    new AfterProcessingMessagesEvent($this, $messages) // TODO add flag code
182 32
                );
183
184 32
                $this->maybeStopConsumer();
185
            })->bindTo($this)
186 10
        ));
187 22
188
        $canPrecessMultiMessages = $executeReceiverStrategy->canPrecessMultiMessages();
189
        if ($canPrecessMultiMessages) {
190 22
            if (!$queueConsuming->receiver instanceof BatchReceiverInterface) {
191
                throw new \InvalidArgumentException('TODO '. $queueConsuming->queueName);
192 5
            }
193 17
        } else {
194
            if (!$queueConsuming->receiver instanceof ReceiverInterface) {
195 15
                throw new \InvalidArgumentException('TODO '. $queueConsuming->queueName);
196
            }
197
        }
198 32
199 32
        $this->executeReceiverStrategies[] = $executeReceiverStrategy;
200
201 32
        return $this;
202
    }
203
204 32
    private function getExecuteReceiverStrategy(QueueConsuming $queueConsuming): ExecuteReceiverStrategyInterface
205
    {
206
        return $this->executeReceiverStrategies[array_search($queueConsuming, $this->queueConsumings, true)];
207
    }
208
209
    /**
210
     * @return QueueConsuming[]
211
     */
212
    public function getQueueConsumings(): array
213
    {
214
        return $this->queueConsumings;
215
    }
216
217
    /**
218
     * Consume the message
219
     * @param   int     $msgAmount
220
     * @return  int
221 4
     *
222
     * @throws  AMQPTimeoutException
223 4
     */
224 4
    public function consume(int $msgAmount = null)
225
    {
226
        $this->target = $msgAmount;
227
        $this->consumed = 0;
228
229 4
        $this->setup();
230
231 4
        $this->lastActivityDateTime = new \DateTime();
232 4
        while ($this->channel->is_consuming()) {
233
            $this->dispatchEvent(OnConsumeEvent::NAME, new OnConsumeEvent($this));
234
            $this->maybeStopConsumer();
235
236
            if ($this->forceStop) {
237 2
                break;
238
            }
239 2
            /*
240 2
             * Be careful not to trigger ::wait() with 0 or less seconds, when
241
             * graceful max execution timeout is being used.
242
             */
243
            $waitTimeout = $this->chooseWaitTimeout();
244
            if ($this->gracefulMaxExecutionDateTime && $waitTimeout < 1) {
245
                return $this->gracefulMaxExecutionTimeoutExitCode;
246
            }
247
248
            try {
249
                $this->channel->wait(null, false, $waitTimeout);
250
                $this->lastActivityDateTime = new \DateTime();
251
                if ($this->forceStop) {
252
                    break;
253
                }
254
            } catch (AMQPTimeoutException $e) {
255
                foreach($this->executeReceiverStrategies as $executeReceiverStrategy) {
256
                    $executeReceiverStrategy->onCatchTimeout($e);
257
                }
258
                $now = new \DateTime();
259
                if ($this->gracefulMaxExecutionDateTime && $this->gracefulMaxExecutionDateTime <= $now) {
260
                    return $this->gracefulMaxExecutionTimeoutExitCode;
261
                }
262
263
                if ($this->idleTimeout && ($this->lastActivityDateTime->getTimestamp() + $this->idleTimeout <= $now->getTimestamp())) {
264
                    $idleEvent = new OnIdleEvent($this);
265
                    $this->dispatchEvent(OnIdleEvent::NAME, $idleEvent);
266
267 10
                    if ($idleEvent->isForceStop()) {
268
                        if (null !== $this->idleTimeoutExitCode) {
269 10
                            return $this->idleTimeoutExitCode;
270 4
                        } else {
271 4
                            throw $e;
272 4
                        }
273 4
                    }
274 4
                }
275
            }
276 4
        }
277 2
278
        return 0;
279
    }
280
281
    /**
282
     * @param AMQPMessage[] $messages
283
     * @param QueueConsuming $queueConsuming
284
     */
285 4
    protected function processMessages(array $messages, QueueConsuming $queueConsuming)
286 4
    {
287
        if (count($messages) === 0) {
288
            throw new \InvalidArgumentException('Messages can not be empty');
289
        }
290
291
        $canPrecessMultiMessages = $this->getExecuteReceiverStrategy($queueConsuming)->canPrecessMultiMessages();
292
        if (!$canPrecessMultiMessages && count($messages) !== 1) {
293
            throw new \InvalidArgumentException('Strategy is not supported process of multi messages');
294
        }
295 4
296 4
        /** @var int[]|int $flags */
297
        $flags = [];
298
        try {
299
            if ($queueConsuming->receiver instanceof ReceiverInterface) {
300
                $flags = $queueConsuming->receiver->execute($messages[0]);
301 6
            } else if ($queueConsuming->receiver instanceof BatchReceiverInterface) {
302 6
                $flags = $queueConsuming->receiver->batchExecute($messages);
303
            } else if ($queueConsuming->receiver instanceof ReplyReceiverInterface) {
304
                $reply = $queueConsuming->receiver->execute($messages[0]);
305
                $isRpcCall = $messages[0]->has('reply_to') && $messages[0]->has('correlation_id');
306
                if ($isRpcCall) {
307
                    $this->sendRpcReply($messages[0], $reply);
308
                    $flags = ReceiverInterface::MSG_ACK;
309
                } else {
310
                    $flags = ReceiverInterface::MSG_REJECT;
311
                    // logging
312
                }
313
            } else {
314
                throw new \InvalidArgumentException('TODO');
315
            }
316
        } catch (ReceiverException $exception) {
317
            $flags = $exception->getCode();
318
        } catch (NotReadyReceiveException $exception) {
319
            // TODO
320
            $this->forceStop = true;
321
            return;
322
        }
323
324
        if (!is_array($flags)) { // spread handle flag for each delivery tag
325
            $flag = $flags;
326
            $flags = [];
327
            foreach ($messages as $message) {
328
                $flags[$message->getDeliveryTag()] = $flag;
329
            }
330
        } else if (count($flags) !== count($messages)) {
331
            throw new AMQPRuntimeException(
332
                'Method batchExecute() should return an array with elements equal with the number of messages processed'
333
            );
334
        }
335
336
        if (!$queueConsuming->noAck) {
337
            $messages = array_combine(
338
                array_map(fn ($message) => $message->getDeliveryTag(), $messages),
339
                $messages
340
            );
341
342
            $this->handleProcessMessages($messages, $flags, $queueConsuming);
343
        }
344
    }
345
346
    /**
347
     * @param AMQPMessage[] $messages
348
     * @param int[]|RpcReponse[]|RpcResponseException[]|bool[] $replies
349
     */
350
    private function handleProcessMessages($messages, array $replies, QueueConsuming $queueConsuming)
351
    {
352
        $executeReceiverStrategy = $this->getExecuteReceiverStrategy($queueConsuming);
353
354
        $ack = !array_search(fn ($reply) => $reply !== ReceiverInterface::MSG_ACK, $replies, true);
355
        if ($this->multiAck && count($messages) > 1 && $ack) {
356
            $channels = array_map(fn ($message) => $message->getChannel(), $messages);
357
            if (count($channels) !== array_unique($channels)) { // all messages have same channel
358
                throw new InvalidArgumentException('Messages can not be processed as multi ack with different channels');
359
            }
360
361
            $lastDeliveryTag = array_key_last($replies);
362
363
            $this->channel->basic_ack($lastDeliveryTag, true);
364
            $this->consumed = $this->consumed + count($messages);
365
            foreach ($messages as $message) {
366
                $executeReceiverStrategy->onMessageProcessed($message);
367
            }
368
        } else {
369
            foreach ($replies as $deliveryTag => $reply) {
370
                $message = $messages[$deliveryTag] ?? null;
371
                if (null === $message) {
372
                    throw new AMQPRuntimeException(sprintf('Unknown delivery_tag %d!', $deliveryTag));
373
                }
374
375
                $channel = $message->getChannel();
376
                $processFlag = $reply;
377
                if ($processFlag === ReceiverInterface::MSG_REJECT_REQUEUE || false === $processFlag) {
378
                    $channel->basic_reject($deliveryTag, true); // Reject and requeue message to RabbitMQ
379
                } else if ($processFlag === ReceiverInterface::MSG_SINGLE_NACK_REQUEUE) {
380
                    $channel->basic_nack($deliveryTag, false, true); // NACK and requeue message to RabbitMQ
381
                } else if ($processFlag === ReceiverInterface::MSG_REJECT) {
382
                    $channel->basic_reject($deliveryTag, false); // Reject and drop
383
                } else if ($processFlag !== ReceiverInterface::MSG_ACK_SENT) {
384
                    $channel->basic_ack($deliveryTag); // Remove message from queue only if callback return not false
385
                }
386
387
                $this->consumed++;
388
389
                $executeReceiverStrategy->onMessageProcessed($message);
390
            }
391
        }
392
    }
393
394
    protected function sendRpcReply(AMQPMessage $message, $result)
395
    {
396
        if ($result instanceof RpcReponse || $result instanceof RpcResponseException) {
397
            $body = $this->serializer->serialize($result);
398
            $replayMessage = new AMQPMessage($body, [
399
                'content_type' => 'text/plain',
400
                'correlation_id' => $message->get('correlation_id'),
401
            ]);
402
            $message->getChannel()->basic_publish($replayMessage , '', $message->get('reply_to'));
403
        } else {
404
            $this->logger->error('Rpc call send msg to queue which have not rpc reponse', [
405
                'amqp' => ['message' => $message]
406
            ]);
407
        }
408
    }
409
410
    protected function maybeStopConsumer()
411
    {
412
        if ($this->forceStop || ($this->target && $this->consumed == $this->target)) {
413
            $this->stopConsuming();
414
        }
415
    }
416
417
    public function forceStopConsumer()
418
    {
419
        $this->forceStop = true;
420
    }
421
422
    public function stopConsuming($immedietly = false)
423
    {
424
        if (false === $immedietly) {
425
            foreach ($this->executeReceiverStrategies as $executeReceiverStrategy) {
426
                $executeReceiverStrategy->onStopConsuming();
427
            }
428
        }
429
430
        foreach ($this->consumerTags as $consumerTag) {
431
            $this->channel->basic_cancel($consumerTag, false, true);
432
        }
433
434
        $this->consumerTags = [];
435
    }
436
437
    /**
438
     * @param int $secondsInTheFuture
439
     */
440
    public function setGracefulMaxExecutionDateTimeFromSecondsInTheFuture($secondsInTheFuture)
441
    {
442
        $this->gracefulMaxExecutionDateTime = new \DateTime("+{$secondsInTheFuture} seconds");
443
    }
444
445
    /**
446
     * Choose the timeout wait (in seconds) to use for the $this->getChannel()->wait() method.
447
     */
448
    private function chooseWaitTimeout(): int
449
    {
450
        if ($this->gracefulMaxExecutionDateTime) {
451
            $allowedExecutionSeconds = $this->gracefulMaxExecutionDateTime->getTimestamp() - time();
452
453
            /*
454
             * Respect the idle timeout if it's set and if it's less than
455
             * the remaining allowed execution.
456
             */
457
            $waitTimeout = $this->idleTimeout && $this->idleTimeout < $allowedExecutionSeconds
458
                ? $this->idleTimeout
459
                : $allowedExecutionSeconds;
460
        } else {
461
            $waitTimeout = $this->idleTimeout;
462
        }
463
464
        if (!is_null($this->timeoutWait) && $this->timeoutWait > 0) {
465
            $waitTimeout = min($waitTimeout, $this->timeoutWait);
466
        }
467
        return $waitTimeout;
468
    }
469
}
470