Test Failed
Pull Request — master (#39)
by Aleksandr
02:42
created

Consumer   B

Complexity

Total Complexity 46

Size/Duplication

Total Lines 313
Duplicated Lines 0 %

Test Coverage

Coverage 66.09%

Importance

Changes 17
Bugs 0 Features 2
Metric Value
eloc 116
dl 0
loc 313
ccs 76
cts 115
cp 0.6609
rs 8.72
c 17
b 0
f 2
wmc 46

How to fix   Complexity   

Complex Class

Complex classes like Consumer often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use Consumer, and based on these observations, apply Extract Interface, too.

1
<?php
2
3
namespace OldSound\RabbitMqBundle\RabbitMq;
4
5
use http\Exception\InvalidArgumentException;
6
use OldSound\RabbitMqBundle\Declarations\QueueConsuming;
7
use OldSound\RabbitMqBundle\Event\AfterProcessingMessageEvent;
8
use OldSound\RabbitMqBundle\Event\AfterProcessingMessagesEvent;
9
use OldSound\RabbitMqBundle\Event\AMQPEvent;
10
use OldSound\RabbitMqBundle\Event\BeforeProcessingMessageEvent;
11
use OldSound\RabbitMqBundle\Event\BeforeProcessingMessagesEvent;
12
use OldSound\RabbitMqBundle\Event\OnConsumeEvent;
13
use OldSound\RabbitMqBundle\Event\OnIdleEvent;
14
use OldSound\RabbitMqBundle\EventDispatcherAwareTrait;
15
use OldSound\RabbitMqBundle\ExecuteReceiverStrategy\BatchExecuteReceiverStrategy;
16
use OldSound\RabbitMqBundle\ExecuteReceiverStrategy\ExecuteReceiverStrategyInterface;
17
use OldSound\RabbitMqBundle\ExecuteReceiverStrategy\FnMessagesProcessor;
18
use OldSound\RabbitMqBundle\ExecuteReceiverStrategy\MessagesProcessorInterface;
19
use OldSound\RabbitMqBundle\ExecuteReceiverStrategy\SimpleExecuteReceiverStrategy;
20
use OldSound\RabbitMqBundle\MemoryChecker\MemoryConsumptionChecker;
21
use OldSound\RabbitMqBundle\MemoryChecker\NativeMemoryUsageProvider;
22
use OldSound\RabbitMqBundle\Producer\ProducerInterface;
23
use OldSound\RabbitMqBundle\Receiver\NotReadyReceiveException;
24
use OldSound\RabbitMqBundle\Receiver\ReceiverException;
25
use OldSound\RabbitMqBundle\Receiver\ReceiverInterface;
26
use OldSound\RabbitMqBundle\Receiver\ReplyReceiverInterface;
27
use OldSound\RabbitMqBundle\Serializer\JsonMessageBodySerializer;
28
use OldSound\RabbitMqBundle\Serializer\MessageBodySerializerInterface;
29
use PhpAmqpLib\Channel\AMQPChannel;
30
use PhpAmqpLib\Exception\AMQPRuntimeException;
31
use PhpAmqpLib\Exception\AMQPTimeoutException;
32
use PhpAmqpLib\Message\AMQPMessage;
33
use Psr\Log\LoggerAwareTrait;
34
use Psr\Log\NullLogger;
35
use OldSound\RabbitMqBundle\ExecuteReceiverStrategy\ExecuteReceiverStrategyInterface;
0 ignored issues
show
Bug introduced by
A parse error occurred: Cannot use OldSound\RabbitMqBundle\ExecuteReceiverStrategy\ExecuteReceiverStrategyInterface as ExecuteReceiverStrategyInterface because the name is already in use
Loading history...
36
use Symfony\Component\Serializer\SerializerInterface;
37
38
class Consumer
39
{
40
    use LoggerAwareTrait;
41
    use EventDispatcherAwareTrait;
42
43
    /** @var AMQPChannel */
44
    protected $channel;
45
    /** @var QueueConsuming[] */
46
    protected $queueConsumings = [];
47
    /** @var ExecuteReceiverStrategyInterface[] */
48
    protected $executeReceiverStrategies = [];
49
    /** @var MessageBodySerializerInterface */
50 32
    protected $serializer;
51
52 32
    /** @var string[] */
53
    protected $consumerTags = [];
54
    /** @var array */
55
    protected $basicProperties = [
56
        'content_type' => 'text/plain',
57
        'delivery_mode' => ProducerInterface::DELIVERY_MODE_PERSISTENT
58
    ];
59
    /** @var int|null */
60
    protected $target;
61
    /** @var int */
62
    protected $consumed = 0;
63
    /** @var bool */
64 12
    protected $forceStop = false;
65
    /**
66 12
     * Importrant! If true - then channel can not be used from somewhere else
67
     * @var bool
68 12
     */
69
    public $multiAck = false;
70 12
    /**
71 10
     * @var \DateTime|null DateTime after which the consumer will gracefully exit. "Gracefully" means, that
72 10
     *      any currently running consumption will not be interrupted.
73
     */
74
    public $gracefulMaxExecutionDateTime;
75
    /** @var int Exit code used, when consumer is closed by the Graceful Max Execution Timeout feature. */
76
    public $gracefulMaxExecutionTimeoutExitCode = 0;
77
    /** @var int|null */
78 10
    public $timeoutWait;
79
    /** @var int */
80 10
    public $idleTimeout = 0;
81 10
    /** @var int */
82
    public $idleTimeoutExitCode;
83 2
    /** @var \DateTime|null */
84
    public $lastActivityDateTime;
85
86 8
    public function __construct(AMQPChannel $channel)
87
    {
88 8
        $this->channel = $channel;
89 6
        $this->logger = new NullLogger();
90 6
        $this->serializer = new JsonMessageBodySerializer();
91 2
    }
92
93
    public function getChannel(): AMQPChannel
94 4
    {
95 4
        return $this->channel;
96
    }
97 4
98 4
    public function setSerializer(MessageBodySerializerInterface $serializer)
99 2
    {
100
        $this->serializer = $serializer;
101 2
    }
102
103
    protected function setup(): Consumer
104
    {
105
        foreach($this->queueConsumings as $index => $queueConsuming) {
106
            $this->channel->basic_qos($queueConsuming->qosPrefetchSize, $queueConsuming->qosPrefetchCount, false);
107
108 4
            $consumerTag = $this->channel->basic_consume(
109
                $queueConsuming->queueName,
110
                $queueConsuming->consumerTag ?
111
                    $queueConsuming->consumerTag :
112
                    sprintf("PHPPROCESS_%s_%s_%s", gethostname(), getmypid(), $index),
113
                $queueConsuming->noLocal,
114
                $queueConsuming->noAck,
115
                $queueConsuming->exclusive,
116
                $queueConsuming->nowait,
117
                function (AMQPMessage $message) use ($queueConsuming) {
118
                    $this->getExecuteReceiverStrategy($queueConsuming)->consumeCallback($message);
119
                });
120
121
            //$queueConsuming->consumerTag = $consumerTag;
122
            $this->consumerTags[] = $consumerTag;
123
        }
124
125
        return $this;
126
    }
127 32
128
    /**
129 32
     * @param iterable|QueueConsuming[] $queueConsumings
130 32
     */
131
    public function consumeQueues(iterable $queueConsumings)
132
    {
133 32
        foreach ($queueConsumings as $queueConsuming) {
134 32
            $this->consumeQueue($queueConsuming);
135 32
        }
136 32
    }
137 32
138
    public function consumeQueue(QueueConsuming $queueConsuming, ExecuteReceiverStrategyInterface $executeReceiverStrategy = null): Consumer
139 32
    {
140
        $this->queueConsumings[] = $queueConsuming;
141 32
        if (null === $executeReceiverStrategy) {
142 32
            $executeReceiverStrategy = null === $queueConsuming->batchCount ?
143 32
                new SimpleExecuteReceiverStrategy() :
144
                new BatchExecuteReceiverStrategy($queueConsuming->batchCount);
145
        }
146
147
        $executeReceiverStrategy->setMessagesProccessor(new FnMessagesProcessor(
148
            (function (array $messages) use ($queueConsuming) {
149
                $logAmqpContext = ['queue' => $queueConsuming->queueName];
150
                if ($this->getExecuteReceiverStrategy($queueConsuming)->canPrecessMultiMessages()) {
151
                    $logAmqpContext['messages'] = $messages;
152
                } else {
153
                    $logAmqpContext['message'] = $messages[0];
154
                }
155
156
                $this->dispatchEvent(BeforeProcessingMessagesEvent::NAME,
157
                    new BeforeProcessingMessagesEvent($this, $messages, $queueConsuming)
158
                );
159
160
                try {
161
                    $this->processMessages($messages, $queueConsuming);
162
                } catch (Exception\StopConsumerException $e) {
163
                    $this->logger->info('Consumer requested stop', [
164
                        'amqp' => $logAmqpContext,
165
                        'exception' => $e
166
                    ]);
167
168
                    $this->stopConsuming(true);
169
                    return;
170
                } catch (\Throwable $e) {
171
                    $this->logger->error('Throw exception while process messages', [
172
                        'amqp' => $logAmqpContext,
173
                        'exception' => $e
174
                    ]);
175 32
                    throw $e;
176
                }
177 14
178
                $this->logger->info('Queue messages processed', ['amqp' => $logAmqpContext]); // TODO add flag code
179 14
                $this->dispatchEvent(
180 14
                    AfterProcessingMessagesEvent::NAME,
181
                    new AfterProcessingMessagesEvent($this, $messages) // TODO add flag code
182 32
                );
183
184 32
                $this->maybeStopConsumer();
185
            })->bindTo($this)
186 10
        ));
187 22
188
        $canPrecessMultiMessages = $executeReceiverStrategy->canPrecessMultiMessages();
189
        if ($canPrecessMultiMessages) {
190 22
            if (!$queueConsuming->receiver instanceof BatchReceiverInterface) {
191
                throw new \InvalidArgumentException('TODO '. $queueConsuming->queueName);
192 5
            }
193 17
        } else {
194
            if (!$queueConsuming->receiver instanceof ReceiverInterface) {
195 15
                throw new \InvalidArgumentException('TODO '. $queueConsuming->queueName);
196
            }
197
        }
198 32
199 32
        $this->executeReceiverStrategies[] = $executeReceiverStrategy;
200
201 32
        return $this;
202
    }
203
204 32
    private function getExecuteReceiverStrategy(QueueConsuming $queueConsuming): ExecuteReceiverStrategyInterface
205
    {
206
        return $this->executeReceiverStrategies[array_search($queueConsuming, $this->queueConsumings, true)];
207
    }
208
209
    /**
210
     * @return QueueConsuming[]
211
     */
212
    public function getQueueConsumings(): array
213
    {
214
        return $this->queueConsumings;
215
    }
216
217
    /**
218
     * Consume the message
219
     * @param   int     $msgAmount
220
     * @return  int
221 4
     *
222
     * @throws  AMQPTimeoutException
223 4
     */
224 4
    public function consume(int $msgAmount = null)
225
    {
226
        $this->target = $msgAmount;
227
        $this->consumed = 0;
228
229 4
        $this->setup();
230
231 4
        $this->lastActivityDateTime = new \DateTime();
232 4
        while ($this->channel->is_consuming()) {
233
            $this->dispatchEvent(OnConsumeEvent::NAME, new OnConsumeEvent($this));
234
            $this->maybeStopConsumer();
235
236
            if ($this->forceStop) {
237 2
                break;
238
            }
239 2
            /*
240 2
             * Be careful not to trigger ::wait() with 0 or less seconds, when
241
             * graceful max execution timeout is being used.
242
             */
243
            $waitTimeout = $this->chooseWaitTimeout();
244
            if ($this->gracefulMaxExecutionDateTime && $waitTimeout < 1) {
245
                return $this->gracefulMaxExecutionTimeoutExitCode;
246
            }
247
248
            try {
249
                $this->channel->wait(null, false, $waitTimeout);
250
                $this->lastActivityDateTime = new \DateTime();
251
                if ($this->forceStop) {
252
                    break;
253
                }
254
            } catch (AMQPTimeoutException $e) {
255
                foreach($this->executeReceiverStrategies as $executeReceiverStrategy) {
256
                    $executeReceiverStrategy->onCatchTimeout($e);
257
                }
258
                $now = new \DateTime();
259
                if ($this->gracefulMaxExecutionDateTime && $this->gracefulMaxExecutionDateTime <= $now) {
260
                    return $this->gracefulMaxExecutionTimeoutExitCode;
261
                }
262
263
                if ($this->idleTimeout && ($this->lastActivityDateTime->getTimestamp() + $this->idleTimeout <= $now->getTimestamp())) {
264
                    $idleEvent = new OnIdleEvent($this);
265
                    $this->dispatchEvent(OnIdleEvent::NAME, $idleEvent);
266
267 10
                    if ($idleEvent->isForceStop()) {
268
                        if (null !== $this->idleTimeoutExitCode) {
269 10
                            return $this->idleTimeoutExitCode;
270 4
                        } else {
271 4
                            throw $e;
272 4
                        }
273 4
                    }
274 4
                }
275
            }
276 4
        }
277 2
278
        return 0;
279
    }
280
281
    /**
282
     * @param AMQPMessage[] $messages
283
     * @param QueueConsuming $queueConsuming
284
     */
285 4
    protected function processMessages(array $messages, QueueConsuming $queueConsuming)
286 4
    {
287
        if (count($messages) === 0) {
288
            throw new \InvalidArgumentException('Messages can not be empty');
289
        }
290
291
        $canPrecessMultiMessages = $this->getExecuteReceiverStrategy($queueConsuming)->canPrecessMultiMessages();
292
        if (!$canPrecessMultiMessages && count($messages) !== 1) {
293
            throw new \InvalidArgumentException('Strategy is not supported process of multi messages');
294
        }
295 4
296 4
        /** @var int[]|int $flags */
297
        $flags = [];
298
        try {
299
            if ($queueConsuming->receiver instanceof ReceiverInterface) {
300
                $flags = $queueConsuming->receiver->execute($messages[0]);
301 6
            } else if ($queueConsuming->receiver instanceof BatchReceiverInterface) {
302 6
                $flags = $queueConsuming->receiver->batchExecute($messages);
303
            } else if ($queueConsuming->receiver instanceof ReplyReceiverInterface) {
304
                $reply = $queueConsuming->receiver->execute($messages[0]);
305
                $isRpcCall = $messages[0]->has('reply_to') && $messages[0]->has('correlation_id');
306
                if ($isRpcCall) {
307
                    $this->sendRpcReply($messages[0], $reply);
308
                    $flags = ReceiverInterface::MSG_ACK;
309
                } else {
310
                    $flags = ReceiverInterface::MSG_REJECT;
311
                    // logging
312
                }
313
            } else {
314
                throw new \InvalidArgumentException('TODO');
315
            }
316
        } catch (ReceiverException $exception) {
317
            $flags = $exception->getCode();
318
        } catch (NotReadyReceiveException $exception) {
319
            // TODO
320
            $this->forceStop = true;
321
            return;
322
        }
323
324
        if (!is_array($flags)) { // spread handle flag for each delivery tag
325
            $flag = $flags;
326
            $flags = [];
327
            foreach ($messages as $message) {
328
                $flags[$message->getDeliveryTag()] = $flag;
329
            }
330
        } else if (count($flags) !== count($messages)) {
331
            throw new AMQPRuntimeException(
332
                'Method batchExecute() should return an array with elements equal with the number of messages processed'
333
            );
334
        }
335
336
        if (!$queueConsuming->noAck) {
337
            $messages = array_combine(
338
                array_map(fn ($message) => $message->getDeliveryTag(), $messages),
339
                $messages
340
            );
341
342
            $this->handleProcessMessages($messages, $flags, $queueConsuming);
343
        }
344
    }
345
346
    /**
347
     * @param AMQPMessage[] $messages
348
     * @param int[]|RpcReponse[]|RpcResponseException[]|bool[] $replies
349
     */
350
    private function handleProcessMessages($messages, array $replies, QueueConsuming $queueConsuming)
351
    {
352
        $executeReceiverStrategy = $this->getExecuteReceiverStrategy($queueConsuming);
353
354
        $ack = !array_search(fn ($reply) => $reply !== ReceiverInterface::MSG_ACK, $replies, true);
355
        if ($this->multiAck && count($messages) > 1 && $ack) {
356
            $channels = array_map(fn ($message) => $message->getChannel(), $messages);
357
            if (count($channels) !== array_unique($channels)) { // all messages have same channel
358
                throw new InvalidArgumentException('Messages can not be processed as multi ack with different channels');
359
            }
360
361
            $lastDeliveryTag = array_key_last($replies);
362
363
            $this->channel->basic_ack($lastDeliveryTag, true);
364
            $this->consumed = $this->consumed + count($messages);
365
            foreach ($messages as $message) {
366
                $executeReceiverStrategy->onMessageProcessed($message);
367
            }
368
        } else {
369
            foreach ($replies as $deliveryTag => $reply) {
370
                $message = $messages[$deliveryTag] ?? null;
371
                if (null === $message) {
372
                    throw new AMQPRuntimeException(sprintf('Unknown delivery_tag %d!', $deliveryTag));
373
                }
374
375
                $channel = $message->getChannel();
376
                $processFlag = $reply;
377
                if ($processFlag === ReceiverInterface::MSG_REJECT_REQUEUE || false === $processFlag) {
378
                    $channel->basic_reject($deliveryTag, true); // Reject and requeue message to RabbitMQ
379
                } else if ($processFlag === ReceiverInterface::MSG_SINGLE_NACK_REQUEUE) {
380
                    $channel->basic_nack($deliveryTag, false, true); // NACK and requeue message to RabbitMQ
381
                } else if ($processFlag === ReceiverInterface::MSG_REJECT) {
382
                    $channel->basic_reject($deliveryTag, false); // Reject and drop
383
                } else if ($processFlag !== ReceiverInterface::MSG_ACK_SENT) {
384
                    $channel->basic_ack($deliveryTag); // Remove message from queue only if callback return not false
385
                }
386
387
                $this->consumed++;
388
389
                $executeReceiverStrategy->onMessageProcessed($message);
390
            }
391
        }
392
    }
393
394
    protected function sendRpcReply(AMQPMessage $message, $result)
395
    {
396
        if ($result instanceof RpcReponse || $result instanceof RpcResponseException) {
397
            $body = $this->serializer->serialize($result);
398
            $replayMessage = new AMQPMessage($body, [
399
                'content_type' => 'text/plain',
400
                'correlation_id' => $message->get('correlation_id'),
401
            ]);
402
            $message->getChannel()->basic_publish($replayMessage , '', $message->get('reply_to'));
403
        } else {
404
            $this->logger->error('Rpc call send msg to queue which have not rpc reponse', [
405
                'amqp' => ['message' => $message]
406
            ]);
407
        }
408
    }
409
410
    protected function maybeStopConsumer()
411
    {
412
        if ($this->forceStop || ($this->target && $this->consumed == $this->target)) {
413
            $this->stopConsuming();
414
        }
415
    }
416
417
    public function forceStopConsumer()
418
    {
419
        $this->forceStop = true;
420
    }
421
422
    public function stopConsuming($immedietly = false)
423
    {
424
        if (false === $immedietly) {
425
            foreach ($this->executeReceiverStrategies as $executeReceiverStrategy) {
426
                $executeReceiverStrategy->onStopConsuming();
427
            }
428
        }
429
430
        foreach ($this->consumerTags as $consumerTag) {
431
            $this->channel->basic_cancel($consumerTag, false, true);
432
        }
433
434
        $this->consumerTags = [];
435
    }
436
437
    /**
438
     * @param int $secondsInTheFuture
439
     */
440
    public function setGracefulMaxExecutionDateTimeFromSecondsInTheFuture($secondsInTheFuture)
441
    {
442
        $this->gracefulMaxExecutionDateTime = new \DateTime("+{$secondsInTheFuture} seconds");
443
    }
444
445
    /**
446
     * Choose the timeout wait (in seconds) to use for the $this->getChannel()->wait() method.
447
     */
448
    private function chooseWaitTimeout(): int
449
    {
450
        if ($this->gracefulMaxExecutionDateTime) {
451
            $allowedExecutionSeconds = $this->gracefulMaxExecutionDateTime->getTimestamp() - time();
452
453
            /*
454
             * Respect the idle timeout if it's set and if it's less than
455
             * the remaining allowed execution.
456
             */
457
            $waitTimeout = $this->idleTimeout && $this->idleTimeout < $allowedExecutionSeconds
458
                ? $this->idleTimeout
459
                : $allowedExecutionSeconds;
460
        } else {
461
            $waitTimeout = $this->idleTimeout;
462
        }
463
464
        if (!is_null($this->timeoutWait) && $this->timeoutWait > 0) {
465
            $waitTimeout = min($waitTimeout, $this->timeoutWait);
466
        }
467
        return $waitTimeout;
468
    }
469
}
470