Completed
Pull Request — master (#39)
by Aleksandr
06:46
created

Consumer::getQueueConsumings()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 1.125

Importance

Changes 0
Metric Value
cc 1
eloc 1
c 0
b 0
f 0
nc 1
nop 0
dl 0
loc 3
ccs 1
cts 2
cp 0.5
crap 1.125
rs 10
1
<?php
2
3
namespace OldSound\RabbitMqBundle\RabbitMq;
4
5
use http\Exception\InvalidArgumentException;
6
use OldSound\RabbitMqBundle\Declarations\QueueConsuming;
7
use OldSound\RabbitMqBundle\Event\AfterProcessingMessageEvent;
0 ignored issues
show
Bug introduced by
The type OldSound\RabbitMqBundle\...rProcessingMessageEvent was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
8
use OldSound\RabbitMqBundle\Event\AfterProcessingMessagesEvent;
9
use OldSound\RabbitMqBundle\Event\AMQPEvent;
0 ignored issues
show
Bug introduced by
The type OldSound\RabbitMqBundle\Event\AMQPEvent was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
10
use OldSound\RabbitMqBundle\Event\BeforeProcessingMessageEvent;
0 ignored issues
show
Bug introduced by
The type OldSound\RabbitMqBundle\...eProcessingMessageEvent was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
11
use OldSound\RabbitMqBundle\Event\BeforeProcessingMessagesEvent;
12
use OldSound\RabbitMqBundle\Event\OnConsumeEvent;
13
use OldSound\RabbitMqBundle\Event\OnIdleEvent;
14
use OldSound\RabbitMqBundle\EventDispatcherAwareTrait;
15
use OldSound\RabbitMqBundle\ExecuteCallbackStrategy\FnMessagesProcessor;
16
use OldSound\RabbitMqBundle\ExecuteCallbackStrategy\MessagesProcessorInterface;
17
use OldSound\RabbitMqBundle\MemoryChecker\MemoryConsumptionChecker;
18
use OldSound\RabbitMqBundle\MemoryChecker\NativeMemoryUsageProvider;
19
use OldSound\RabbitMqBundle\RabbitMq\Exception\RpcResponseException;
20
use OldSound\RabbitMqBundle\Serializer\JsonMessageBodySerializer;
21
use OldSound\RabbitMqBundle\Serializer\MessageBodySerializerInterface;
22
use PhpAmqpLib\Channel\AMQPChannel;
23
use PhpAmqpLib\Exception\AMQPRuntimeException;
24
use PhpAmqpLib\Exception\AMQPTimeoutException;
25
use PhpAmqpLib\Message\AMQPMessage;
26
use Psr\Log\LoggerAwareTrait;
27
use Psr\Log\NullLogger;
28
use OldSound\RabbitMqBundle\ExecuteCallbackStrategy\ExecuteCallbackStrategyInterface;
29
use Symfony\Component\Serializer\SerializerInterface;
30
31
class Consumer
32
{
33
    use LoggerAwareTrait;
34
    use EventDispatcherAwareTrait;
35
36
    /** @var AMQPChannel */
37
    protected $channel;
38
    /** @var QueueConsuming[] */
39
    protected $queueConsumings = [];
40
    /** @var ExecuteCallbackStrategyInterface[] */
41
    protected $executeCallbackStrategies = [];
42
    /** @var MessageBodySerializerInterface */
43
    protected $serializer;
44
45
    /** @var string[] */
46
    protected $consumerTags = [];
47
    /** @var array */
48
    protected $basicProperties = [
49
        'content_type' => 'text/plain',
50 32
        'delivery_mode' => 2
51
    ];
52 32
    /** @var int|null */
53
    protected $target;
54
    /** @var int */
55
    protected $consumed = 0;
56
    /** @var bool */
57
    protected $forceStop = false;
58
    /**
59
     * Importrant! If true - then channel can not be used from somewhere else
60
     * @var bool
61
     */
62
    public $multiAck = false;
63
    /**
64 12
     * @var \DateTime|null DateTime after which the consumer will gracefully exit. "Gracefully" means, that
65
     *      any currently running consumption will not be interrupted.
66 12
     */
67
    public $gracefulMaxExecutionDateTime;
68 12
    /** @var int Exit code used, when consumer is closed by the Graceful Max Execution Timeout feature. */
69
    public $gracefulMaxExecutionTimeoutExitCode = 0;
70 12
    /** @var int|null */
71 10
    public $timeoutWait;
72 10
    /** @var int */
73
    public $idleTimeout = 0;
74
    /** @var int */
75
    public $idleTimeoutExitCode;
76
    /** @var \DateTime|null */
77
    public $lastActivityDateTime;
78 10
79
    public function __construct(AMQPChannel $channel)
80 10
    {
81 10
        $this->channel = $channel;
82
        $this->logger = new NullLogger();
83 2
        $this->serializer = new JsonMessageBodySerializer();
84
    }
85
86 8
    public function getChannel(): AMQPChannel
87
    {
88 8
        return $this->channel;
89 6
    }
90 6
91 2
    public function setSerializer(MessageBodySerializerInterface $serializer)
92
    {
93
        $this->serializer = $serializer;
94 4
    }
95 4
96
    protected function setup(): Consumer
97 4
    {
98 4
        foreach($this->queueConsumings as $index => $queueConsuming) {
99 2
            $this->channel->basic_qos($queueConsuming->qosPrefetchSize, $queueConsuming->qosPrefetchCount, false);
100
101 2
            $consumerTag = $this->channel->basic_consume(
102
                $queueConsuming->queueName,
103
                $queueConsuming->consumerTag ?
104
                    $queueConsuming->consumerTag :
105
                    sprintf("PHPPROCESS_%s_%s_%s", gethostname(), getmypid(), $index),
106
                $queueConsuming->noLocal,
107
                $queueConsuming->noAck,
108 4
                $queueConsuming->exclusive,
109
                $queueConsuming->nowait,
110
                function (AMQPMessage $message) use ($queueConsuming) {
111
                    $this->getExecuteCallbackStrategy($queueConsuming)->consumeCallback($message);
112
                });
113
114
            //$queueConsuming->consumerTag = $consumerTag;
115
            $this->consumerTags[] = $consumerTag;
116
        }
117
118
        return $this;
119
    }
120
121
    public function consumeQueue(QueueConsuming $queueConsuming, ExecuteCallbackStrategyInterface $executeCallbackStrategy): Consumer
122
    {
123
        $this->queueConsumings[] = $queueConsuming;
124
        $executeCallbackStrategy->setMessagesProccessor(new FnMessagesProcessor(
125
            (function (array $messages) use ($queueConsuming) {
126
                $logAmqpContext = ['queue' => $queueConsuming->queueName];
127 32
                if ($this->getExecuteCallbackStrategy($queueConsuming)->canPrecessMultiMessages()) {
128
                    $logAmqpContext['messages'] = $messages;
129 32
                } else {
130 32
                    $logAmqpContext['message'] = $messages[0];
131
                }
132
133 32
                $this->dispatchEvent(BeforeProcessingMessagesEvent::NAME,
134 32
                    new BeforeProcessingMessagesEvent($this, $messages, $queueConsuming)
135 32
                );
136 32
137 32
                try {
138
                    $replies = $this->processMessages($messages, $queueConsuming);
0 ignored issues
show
Unused Code introduced by
The assignment to $replies is dead and can be removed.
Loading history...
139 32
                } catch (Exception\StopConsumerException $e) {
140
                    $this->logger->info('Consumer requested stop', [
141 32
                        'amqp' => $logAmqpContext,
142 32
                        'exception' => $e
143 32
                    ]);
144
145
                    $this->stopConsuming(true);
146
                    return;
147
                } catch (\Throwable $e) {
148
                    $this->logger->error('Throw exception while process messages', [
149
                        'amqp' => $logAmqpContext,
150
                        'exception' => $e
151
                    ]);
152
                    throw $e;
153
                }
154
155
                $this->logger->info('Queue messages processed', ['amqp' => $logAmqpContext]); // TODO add flag code
156
                $this->dispatchEvent(
157
                    AfterProcessingMessagesEvent::NAME,
158
                    new AfterProcessingMessagesEvent($this, $messages) // TODO add flag code
159
                );
160
161
                $this->maybeStopConsumer();
162
            })->bindTo($this)
163
        ));
164
165
        $canPrecessMultiMessages = $executeCallbackStrategy->canPrecessMultiMessages();
166
        if ($canPrecessMultiMessages) {
167
            if (!$queueConsuming->callback instanceof BatchConsumerInterface) {
168
                throw new \InvalidArgumentException('TODO '. $queueConsuming->queueName);
169
            }
170
        } else {
171
            if (!$queueConsuming->callback instanceof ConsumerInterface) {
172
                throw new \InvalidArgumentException('TODO '. $queueConsuming->queueName);
173
            }
174
        }
175 32
176
        $this->executeCallbackStrategies[] = $executeCallbackStrategy;
177 14
178
        return $this;
179 14
    }
180 14
181
    private function getExecuteCallbackStrategy(QueueConsuming $queueConsuming): ExecuteCallbackStrategyInterface
182 32
    {
183
        return $this->executeCallbackStrategies[array_search($queueConsuming, $this->queueConsumings, true)];
184 32
    }
185
186 10
    /**
187 22
     * @return QueueConsuming[]
188
     */
189
    public function getQueueConsumings(): array
190 22
    {
191
        return $this->queueConsumings;
192 5
    }
193 17
194
    /**
195 15
     * Consume the message
196
     * @param   int     $msgAmount
197
     * @return  int
198 32
     *
199 32
     * @throws  AMQPTimeoutException
200
     */
201 32
    public function consume(int $msgAmount = null)
202
    {
203
        $this->target = $msgAmount;
204 32
        $this->consumed = 0;
205
        
206
        $this->setup();
207
        
208
        $this->lastActivityDateTime = new \DateTime();
209
        while ($this->channel->is_consuming()) {
210
            $this->dispatchEvent(OnConsumeEvent::NAME, new OnConsumeEvent($this));
211
            $this->maybeStopConsumer();
212
213
            if ($this->forceStop) {
214
                break;
215
            }
216
            /*
217
             * Be careful not to trigger ::wait() with 0 or less seconds, when
218
             * graceful max execution timeout is being used.
219
             */
220
            $waitTimeout = $this->chooseWaitTimeout();
221 4
            if ($this->gracefulMaxExecutionDateTime && $waitTimeout < 1) {
222
                return $this->gracefulMaxExecutionTimeoutExitCode;
223 4
            }
224 4
225
            try {
226
                $this->channel->wait(null, false, $waitTimeout);
227
                $this->lastActivityDateTime = new \DateTime();
228
                if ($this->forceStop) {
229 4
                    break;
230
                }
231 4
            } catch (AMQPTimeoutException $e) {
232 4
                foreach($this->executeCallbackStrategies as $executeCallbackStrategy) {
233
                    $executeCallbackStrategy->onCatchTimeout($e);
234
                }
235
                $now = new \DateTime();
236
                if ($this->gracefulMaxExecutionDateTime && $this->gracefulMaxExecutionDateTime <= $now) {
237 2
                    return $this->gracefulMaxExecutionTimeoutExitCode;
238
                }
239 2
240 2
                if ($this->idleTimeout && ($this->lastActivityDateTime->getTimestamp() + $this->idleTimeout <= $now->getTimestamp())) {
241
                    $idleEvent = new OnIdleEvent($this);
242
                    $this->dispatchEvent(OnIdleEvent::NAME, $idleEvent);
243
244
                    if ($idleEvent->isForceStop()) {
245
                        if (null !== $this->idleTimeoutExitCode) {
246
                            return $this->idleTimeoutExitCode;
247
                        } else {
248
                            throw $e;
249
                        }
250
                    }
251
                }
252
            }
253
        }
254
255
        return 0;
256
    }
257
258
    /**
259
     * @param AMQPMessage[] $messages
260
     * @param QueueConsuming $queueConsuming
261
     */
262
    protected function processMessages(array $messages, QueueConsuming $queueConsuming)
263
    {
264
        if (count($messages) === 0) {
265
            throw new \InvalidArgumentException('Messages can not be empty');
266
        }
267 10
268
        $canPrecessMultiMessages = $this->getExecuteCallbackStrategy($queueConsuming)->canPrecessMultiMessages();
269 10
        if (!$canPrecessMultiMessages && count($messages) !== 1) {
270 4
            throw new \InvalidArgumentException('Strategy is not supported process of multi messages');
271 4
        }
272 4
273 4
        /** @var int[]|RpcReponse[]|RpcResponseException[]|bool[] $replies */
274 4
        $replies = [];
275
        if ($queueConsuming->callback instanceof BatchConsumerInterface) {
276 4
            $replies = $queueConsuming->callback->batchExecute($messages);
277 2
            if (!is_array($replies)) {
278
                $processFlag = $replies;
279
                $replies = [];
280
                foreach ($messages as $message) {
281
                    $replies[$message->getDeliveryTag()] = $processFlag;
282
                }
283
            } else if (count($replies) !== count($messages)) {
284
                throw new AMQPRuntimeException(
285 4
                    'Method batchExecute() should return an array with elements equal with the number of messages processed'
286 4
                );
287
            }
288
        } else {
289
            try {
290
                $replies = [$messages[0]->getDeliveryTag() => $queueConsuming->callback->execute($messages[0])];
291
            } catch (Exception\RpcResponseException $e) {
292
                $replies = [$messages[0]->getDeliveryTag() => $e];
293
            }
294
        }
295 4
296 4
        if (!$queueConsuming->noAck) {
297
            $messages = array_combine(
298
                array_map(fn ($message) => $message->getDeliveryTag(), $messages),
299
                $messages
300
            );
301 6
302 6
            $this->handleProcessMessages($messages, $replies, $queueConsuming);
0 ignored issues
show
Bug introduced by
It seems like $messages can also be of type false; however, parameter $messages of OldSound\RabbitMqBundle\...handleProcessMessages() does only seem to accept PhpAmqpLib\Message\AMQPMessage[], maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

302
            $this->handleProcessMessages(/** @scrutinizer ignore-type */ $messages, $replies, $queueConsuming);
Loading history...
303
        }
304
305
        return $replies;
306
    }
307
308
309
    /**
310
     * @param AMQPMessage[] $messages
311
     * @param int[]|RpcReponse[]|RpcResponseException[]|bool[] $replies
312
     */
313
    private function handleProcessMessages($messages, array $replies, QueueConsuming $queueConsuming)
314
    {
315
        $executeCallbackStrategy = $this->getExecuteCallbackStrategy($queueConsuming);
316
317
        $ack = !array_search(fn ($reply) => $reply !== null && $reply !== ConsumerInterface::MSG_ACK, $replies, true);
318
        if ($this->multiAck && count($messages) > 1 && $ack) {
319
            $channels = array_map(fn ($message) => $message->getChannel(), $messages);
320
            if (count($channels) !== array_unique($channels)) { // all messages have same channel
321
                throw new InvalidArgumentException('Messages can not be processed as multi ack with different channels');
322
            }
323
324
            $this->channel->basic_ack(last($deliveryTag), true);
0 ignored issues
show
Bug introduced by
The function last was not found. Maybe you did not declare it correctly or list all dependencies? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

324
            $this->channel->basic_ack(/** @scrutinizer ignore-call */ last($deliveryTag), true);
Loading history...
Comprehensibility Best Practice introduced by
The variable $deliveryTag seems to be never defined.
Loading history...
325
            $this->consumed = $this->consumed + count($messages);
326
            $executeCallbackStrategy->onMessageProcessed($message);
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable $message does not exist. Did you maybe mean $messages?
Loading history...
327
328
            return array_combine(
329
                array_map(fn ($message) => $message->getDeliveryTag(), $messages),
330
                array_fill(0, count($messages), ConsumerInterface::MSG_ACK)
331
            );
332
        } else {
333
            foreach ($replies as $deliveryTag => $reply) {
334
                $message = $messages[$deliveryTag] ?? null;
335
                if (null === $message) {
336
                    throw new AMQPRuntimeException(sprintf('Unknown delivery_tag %d!', $deliveryTag));
337
                }
338
339
                $channel = $message->getChannel();
340
                $processFlag = $reply;
341
                if ($processFlag === ConsumerInterface::MSG_REJECT_REQUEUE || false === $processFlag) {
342
                    $channel->basic_reject($deliveryTag, true); // Reject and requeue message to RabbitMQ
343
                } else if ($processFlag === ConsumerInterface::MSG_SINGLE_NACK_REQUEUE) {
344
                    $channel->basic_nack($deliveryTag, false, true); // NACK and requeue message to RabbitMQ
345
                } else if ($processFlag === ConsumerInterface::MSG_REJECT) {
346
                    $channel->basic_reject($deliveryTag, false); // Reject and drop
347
                } else if ($processFlag !== ConsumerInterface::MSG_ACK_SENT) {
348
                    $isRpcCall = $message->has('reply_to') && $message->has('correlation_id');
349
                    if ($isRpcCall) {
350
                        $this->sendRpcReply($message, $reply);
351
                    }
352
353
                    $channel->basic_ack($deliveryTag); // Remove message from queue only if callback return not false
354
                }
355
356
                $this->consumed++;
357
358
                $executeCallbackStrategy->onMessageProcessed($message);
359
            }
360
        }
361
    }
362
363
    protected function sendRpcReply(AMQPMessage $message, $result)
364
    {
365
        if ($result instanceof RpcReponse || $result instanceof RpcResponseException) {
366
            $body = $this->serializer->serialize($result);
367
            $replayMessage = new AMQPMessage($body, [
368
                'content_type' => 'text/plain',
369
                'correlation_id' => $message->get('correlation_id'),
370
            ]);
371
            $message->getChannel()->basic_publish($replayMessage , '', $message->get('reply_to'));
0 ignored issues
show
Bug introduced by
It seems like $message->get('reply_to') can also be of type PhpAmqpLib\Channel\AMQPChannel; however, parameter $routing_key of PhpAmqpLib\Channel\AMQPChannel::basic_publish() does only seem to accept string, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

371
            $message->getChannel()->basic_publish($replayMessage , '', /** @scrutinizer ignore-type */ $message->get('reply_to'));
Loading history...
372
        } else {
373
            $this->logger->error('Rpc call send msg to queue which have not rpc reponse', [
374
                'amqp' => ['message' => $message]
375
            ]);
376
        }
377
    }
378
379
    protected function maybeStopConsumer()
380
    {
381
        if ($this->forceStop || ($this->target && $this->consumed == $this->target)) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $this->target of type integer|null is loosely compared to true; this is ambiguous if the integer can be 0. You might want to explicitly use !== null instead.

In PHP, under loose comparison (like ==, or !=, or switch conditions), values of different types might be equal.

For integer values, zero is a special case, in particular the following results might be unexpected:

0   == false // true
0   == null  // true
123 == false // false
123 == null  // false

// It is often better to use strict comparison
0 === false // false
0 === null  // false
Loading history...
382
            $this->stopConsuming();
383
        }
384
    }
385
386
    public function forceStopConsumer()
387
    {
388
        $this->forceStop = true;
389
    }
390
391
    public function stopConsuming($immedietly = false)
392
    {
393
        if (false === $immedietly) {
394
            foreach ($this->executeCallbackStrategies as $executeCallbackStrategy) {
395
                $executeCallbackStrategy->onStopConsuming();
396
            }
397
        }
398
399
        foreach ($this->consumerTags as $consumerTag) {
400
            $this->channel->basic_cancel($consumerTag, false, true);
401
        }
402
403
        $this->consumerTags = [];
404
    }
405
406
    /**
407
     * @param int $secondsInTheFuture
408
     */
409
    public function setGracefulMaxExecutionDateTimeFromSecondsInTheFuture($secondsInTheFuture)
410
    {
411
        $this->gracefulMaxExecutionDateTime = new \DateTime("+{$secondsInTheFuture} seconds");
412
    }
413
414
    /**
415
     * Choose the timeout wait (in seconds) to use for the $this->getChannel()->wait() method.
416
     */
417
    private function chooseWaitTimeout(): int
418
    {
419
        if ($this->gracefulMaxExecutionDateTime) {
420
            $allowedExecutionSeconds = $this->gracefulMaxExecutionDateTime->getTimestamp() - time();
421
422
            /*
423
             * Respect the idle timeout if it's set and if it's less than
424
             * the remaining allowed execution.
425
             */
426
            $waitTimeout = $this->idleTimeout && $this->idleTimeout < $allowedExecutionSeconds
427
                ? $this->idleTimeout
428
                : $allowedExecutionSeconds;
429
        } else {
430
            $waitTimeout = $this->idleTimeout;
431
        }
432
433
        if (!is_null($this->timeoutWait) && $this->timeoutWait > 0) {
434
            $waitTimeout = min($waitTimeout, $this->timeoutWait);
435
        }
436
        return $waitTimeout;
437
    }
438
}
439