Test Failed
Pull Request — master (#39)
by Aleksandr
06:37
created

Consumer::getChannel()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 1
c 0
b 0
f 0
nc 1
nop 0
dl 0
loc 3
ccs 3
cts 3
cp 1
crap 1
rs 10
1
<?php
2
3
namespace OldSound\RabbitMqBundle\RabbitMq;
4
5
use http\Exception\InvalidArgumentException;
6
use OldSound\RabbitMqBundle\Declarations\QueueConsuming;
7
use OldSound\RabbitMqBundle\Event\AfterProcessingMessageEvent;
0 ignored issues
show
Bug introduced by
The type OldSound\RabbitMqBundle\...rProcessingMessageEvent was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
8
use OldSound\RabbitMqBundle\Event\AfterProcessingMessagesEvent;
9
use OldSound\RabbitMqBundle\Event\AMQPEvent;
0 ignored issues
show
Bug introduced by
The type OldSound\RabbitMqBundle\Event\AMQPEvent was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
10
use OldSound\RabbitMqBundle\Event\BeforeProcessingMessageEvent;
0 ignored issues
show
Bug introduced by
The type OldSound\RabbitMqBundle\...eProcessingMessageEvent was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
11
use OldSound\RabbitMqBundle\Event\BeforeProcessingMessagesEvent;
12
use OldSound\RabbitMqBundle\Event\OnConsumeEvent;
13
use OldSound\RabbitMqBundle\Event\OnIdleEvent;
14
use OldSound\RabbitMqBundle\EventDispatcherAwareTrait;
15
use OldSound\RabbitMqBundle\ExecuteCallbackStrategy\BatchExecuteCallbackStrategy;
16
use OldSound\RabbitMqBundle\ExecuteCallbackStrategy\FnMessagesProcessor;
17
use OldSound\RabbitMqBundle\ExecuteCallbackStrategy\MessagesProcessorInterface;
18
use OldSound\RabbitMqBundle\ExecuteCallbackStrategy\SimpleExecuteCallbackStrategy;
19
use OldSound\RabbitMqBundle\MemoryChecker\MemoryConsumptionChecker;
20
use OldSound\RabbitMqBundle\MemoryChecker\NativeMemoryUsageProvider;
21
use OldSound\RabbitMqBundle\RabbitMq\Exception\RpcResponseException;
22
use OldSound\RabbitMqBundle\Serializer\JsonMessageBodySerializer;
23
use OldSound\RabbitMqBundle\Serializer\MessageBodySerializerInterface;
24
use PhpAmqpLib\Channel\AMQPChannel;
25
use PhpAmqpLib\Exception\AMQPRuntimeException;
26
use PhpAmqpLib\Exception\AMQPTimeoutException;
27
use PhpAmqpLib\Message\AMQPMessage;
28
use Psr\Log\LoggerAwareTrait;
29
use Psr\Log\NullLogger;
30
use OldSound\RabbitMqBundle\ExecuteCallbackStrategy\ExecuteCallbackStrategyInterface;
31
use Symfony\Component\Serializer\SerializerInterface;
32
33
class Consumer
34
{
35
    use LoggerAwareTrait;
36
    use EventDispatcherAwareTrait;
37
38
    /** @var AMQPChannel */
39
    protected $channel;
40
    /** @var QueueConsuming[] */
41
    protected $queueConsumings = [];
42
    /** @var ExecuteCallbackStrategyInterface[] */
43
    protected $executeCallbackStrategies = [];
44
    /** @var MessageBodySerializerInterface */
45
    protected $serializer;
46
47
    /** @var string[] */
48
    protected $consumerTags = [];
49
    /** @var array */
50 32
    protected $basicProperties = [
51
        'content_type' => 'text/plain',
52 32
        'delivery_mode' => 2
53
    ];
54
    /** @var int|null */
55
    protected $target;
56
    /** @var int */
57
    protected $consumed = 0;
58
    /** @var bool */
59
    protected $forceStop = false;
60
    /**
61
     * Importrant! If true - then channel can not be used from somewhere else
62
     * @var bool
63
     */
64 12
    public $multiAck = false;
65
    /**
66 12
     * @var \DateTime|null DateTime after which the consumer will gracefully exit. "Gracefully" means, that
67
     *      any currently running consumption will not be interrupted.
68 12
     */
69
    public $gracefulMaxExecutionDateTime;
70 12
    /** @var int Exit code used, when consumer is closed by the Graceful Max Execution Timeout feature. */
71 10
    public $gracefulMaxExecutionTimeoutExitCode = 0;
72 10
    /** @var int|null */
73
    public $timeoutWait;
74
    /** @var int */
75
    public $idleTimeout = 0;
76
    /** @var int */
77
    public $idleTimeoutExitCode;
78 10
    /** @var \DateTime|null */
79
    public $lastActivityDateTime;
80 10
81 10
    public function __construct(AMQPChannel $channel)
82
    {
83 2
        $this->channel = $channel;
84
        $this->logger = new NullLogger();
85
        $this->serializer = new JsonMessageBodySerializer();
86 8
    }
87
88 8
    public function getChannel(): AMQPChannel
89 6
    {
90 6
        return $this->channel;
91 2
    }
92
93
    public function setSerializer(MessageBodySerializerInterface $serializer)
94 4
    {
95 4
        $this->serializer = $serializer;
96
    }
97 4
98 4
    protected function setup(): Consumer
99 2
    {
100
        foreach($this->queueConsumings as $index => $queueConsuming) {
101 2
            $this->channel->basic_qos($queueConsuming->qosPrefetchSize, $queueConsuming->qosPrefetchCount, false);
102
103
            $consumerTag = $this->channel->basic_consume(
104
                $queueConsuming->queueName,
105
                $queueConsuming->consumerTag ?
106
                    $queueConsuming->consumerTag :
107
                    sprintf("PHPPROCESS_%s_%s_%s", gethostname(), getmypid(), $index),
108 4
                $queueConsuming->noLocal,
109
                $queueConsuming->noAck,
110
                $queueConsuming->exclusive,
111
                $queueConsuming->nowait,
112
                function (AMQPMessage $message) use ($queueConsuming) {
113
                    $this->getExecuteCallbackStrategy($queueConsuming)->consumeCallback($message);
114
                });
115
116
            //$queueConsuming->consumerTag = $consumerTag;
117
            $this->consumerTags[] = $consumerTag;
118
        }
119
120
        return $this;
121
    }
122
123
    /**
124
     * @param iterable|QueueConsuming[] $queueConsumings
125
     */
126
    public function consumeQueues(iterable $queueConsumings)
127 32
    {
128
        foreach ($queueConsumings as $queueConsuming) {
129 32
            $this->consumeQueue($queueConsuming);
130 32
        }
131
    }
132
133 32
    public function consumeQueue(QueueConsuming $queueConsuming, ExecuteCallbackStrategyInterface $executeCallbackStrategy = null): Consumer
134 32
    {
135 32
        $this->queueConsumings[] = $queueConsuming;
136 32
        if (null === $executeCallbackStrategy) {
137 32
            $executeCallbackStrategy = null === $queueConsuming->batchCount ?
138
                new SimpleExecuteCallbackStrategy() :
139 32
                new BatchExecuteCallbackStrategy($queueConsuming->batchCount);
140
        }
141 32
142 32
        $executeCallbackStrategy->setMessagesProccessor(new FnMessagesProcessor(
143 32
            (function (array $messages) use ($queueConsuming) {
144
                $logAmqpContext = ['queue' => $queueConsuming->queueName];
145
                if ($this->getExecuteCallbackStrategy($queueConsuming)->canPrecessMultiMessages()) {
146
                    $logAmqpContext['messages'] = $messages;
147
                } else {
148
                    $logAmqpContext['message'] = $messages[0];
149
                }
150
151
                $this->dispatchEvent(BeforeProcessingMessagesEvent::NAME,
152
                    new BeforeProcessingMessagesEvent($this, $messages, $queueConsuming)
153
                );
154
155
                try {
156
                    $replies = $this->processMessages($messages, $queueConsuming);
0 ignored issues
show
Unused Code introduced by
The assignment to $replies is dead and can be removed.
Loading history...
157
                } catch (Exception\StopConsumerException $e) {
158
                    $this->logger->info('Consumer requested stop', [
159
                        'amqp' => $logAmqpContext,
160
                        'exception' => $e
161
                    ]);
162
163
                    $this->stopConsuming(true);
164
                    return;
165
                } catch (\Throwable $e) {
166
                    $this->logger->error('Throw exception while process messages', [
167
                        'amqp' => $logAmqpContext,
168
                        'exception' => $e
169
                    ]);
170
                    throw $e;
171
                }
172
173
                $this->logger->info('Queue messages processed', ['amqp' => $logAmqpContext]); // TODO add flag code
174
                $this->dispatchEvent(
175 32
                    AfterProcessingMessagesEvent::NAME,
176
                    new AfterProcessingMessagesEvent($this, $messages) // TODO add flag code
177 14
                );
178
179 14
                $this->maybeStopConsumer();
180 14
            })->bindTo($this)
181
        ));
182 32
183
        $canPrecessMultiMessages = $executeCallbackStrategy->canPrecessMultiMessages();
184 32
        if ($canPrecessMultiMessages) {
185
            if (!$queueConsuming->callback instanceof BatchConsumerInterface) {
186 10
                throw new \InvalidArgumentException('TODO '. $queueConsuming->queueName);
187 22
            }
188
        } else {
189
            if (!$queueConsuming->callback instanceof ConsumerInterface) {
190 22
                throw new \InvalidArgumentException('TODO '. $queueConsuming->queueName);
191
            }
192 5
        }
193 17
194
        $this->executeCallbackStrategies[] = $executeCallbackStrategy;
195 15
196
        return $this;
197
    }
198 32
199 32
    private function getExecuteCallbackStrategy(QueueConsuming $queueConsuming): ExecuteCallbackStrategyInterface
200
    {
201 32
        return $this->executeCallbackStrategies[array_search($queueConsuming, $this->queueConsumings, true)];
202
    }
203
204 32
    /**
205
     * @return QueueConsuming[]
206
     */
207
    public function getQueueConsumings(): array
208
    {
209
        return $this->queueConsumings;
210
    }
211
212
    /**
213
     * Consume the message
214
     * @param   int     $msgAmount
215
     * @return  int
216
     *
217
     * @throws  AMQPTimeoutException
218
     */
219
    public function consume(int $msgAmount = null)
220
    {
221 4
        $this->target = $msgAmount;
222
        $this->consumed = 0;
223 4
        
224 4
        $this->setup();
225
        
226
        $this->lastActivityDateTime = new \DateTime();
227
        while ($this->channel->is_consuming()) {
228
            $this->dispatchEvent(OnConsumeEvent::NAME, new OnConsumeEvent($this));
229 4
            $this->maybeStopConsumer();
230
231 4
            if ($this->forceStop) {
232 4
                break;
233
            }
234
            /*
235
             * Be careful not to trigger ::wait() with 0 or less seconds, when
236
             * graceful max execution timeout is being used.
237 2
             */
238
            $waitTimeout = $this->chooseWaitTimeout();
239 2
            if ($this->gracefulMaxExecutionDateTime && $waitTimeout < 1) {
240 2
                return $this->gracefulMaxExecutionTimeoutExitCode;
241
            }
242
243
            try {
244
                $this->channel->wait(null, false, $waitTimeout);
245
                $this->lastActivityDateTime = new \DateTime();
246
                if ($this->forceStop) {
247
                    break;
248
                }
249
            } catch (AMQPTimeoutException $e) {
250
                foreach($this->executeCallbackStrategies as $executeCallbackStrategy) {
251
                    $executeCallbackStrategy->onCatchTimeout($e);
252
                }
253
                $now = new \DateTime();
254
                if ($this->gracefulMaxExecutionDateTime && $this->gracefulMaxExecutionDateTime <= $now) {
255
                    return $this->gracefulMaxExecutionTimeoutExitCode;
256
                }
257
258
                if ($this->idleTimeout && ($this->lastActivityDateTime->getTimestamp() + $this->idleTimeout <= $now->getTimestamp())) {
259
                    $idleEvent = new OnIdleEvent($this);
260
                    $this->dispatchEvent(OnIdleEvent::NAME, $idleEvent);
261
262
                    if ($idleEvent->isForceStop()) {
263
                        if (null !== $this->idleTimeoutExitCode) {
264
                            return $this->idleTimeoutExitCode;
265
                        } else {
266
                            throw $e;
267 10
                        }
268
                    }
269 10
                }
270 4
            }
271 4
        }
272 4
273 4
        return 0;
274 4
    }
275
276 4
    /**
277 2
     * @param AMQPMessage[] $messages
278
     * @param QueueConsuming $queueConsuming
279
     */
280
    protected function processMessages(array $messages, QueueConsuming $queueConsuming)
281
    {
282
        if (count($messages) === 0) {
283
            throw new \InvalidArgumentException('Messages can not be empty');
284
        }
285 4
286 4
        $canPrecessMultiMessages = $this->getExecuteCallbackStrategy($queueConsuming)->canPrecessMultiMessages();
287
        if (!$canPrecessMultiMessages && count($messages) !== 1) {
288
            throw new \InvalidArgumentException('Strategy is not supported process of multi messages');
289
        }
290
291
        /** @var int[]|RpcReponse[]|RpcResponseException[]|bool[] $replies */
292
        $replies = [];
293
        if ($queueConsuming->callback instanceof BatchConsumerInterface) {
294
            $replies = $queueConsuming->callback->batchExecute($messages);
295 4
            if (!is_array($replies)) {
296 4
                $processFlag = $replies;
297
                $replies = [];
298
                foreach ($messages as $message) {
299
                    $replies[$message->getDeliveryTag()] = $processFlag;
300
                }
301 6
            } else if (count($replies) !== count($messages)) {
302 6
                throw new AMQPRuntimeException(
303
                    'Method batchExecute() should return an array with elements equal with the number of messages processed'
304
                );
305
            }
306
        } else {
307
            try {
308
                $replies = [$messages[0]->getDeliveryTag() => $queueConsuming->callback->execute($messages[0])];
309
            } catch (Exception\RpcResponseException $e) {
310
                $replies = [$messages[0]->getDeliveryTag() => $e];
311
            }
312
        }
313
314
        if (!$queueConsuming->noAck) {
315
            $messages = array_combine(
316
                array_map(fn ($message) => $message->getDeliveryTag(), $messages),
317
                $messages
318
            );
319
320
            $this->handleProcessMessages($messages, $replies, $queueConsuming);
0 ignored issues
show
Bug introduced by
It seems like $messages can also be of type false; however, parameter $messages of OldSound\RabbitMqBundle\...handleProcessMessages() does only seem to accept PhpAmqpLib\Message\AMQPMessage[], maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

320
            $this->handleProcessMessages(/** @scrutinizer ignore-type */ $messages, $replies, $queueConsuming);
Loading history...
321
        }
322
323
        return $replies;
324
    }
325
326
327
    /**
328
     * @param AMQPMessage[] $messages
329
     * @param int[]|RpcReponse[]|RpcResponseException[]|bool[] $replies
330
     */
331
    private function handleProcessMessages($messages, array $replies, QueueConsuming $queueConsuming)
332
    {
333
        $executeCallbackStrategy = $this->getExecuteCallbackStrategy($queueConsuming);
334
335
        $ack = !array_search(fn ($reply) => $reply !== null && $reply !== ConsumerInterface::MSG_ACK, $replies, true);
336
        if ($this->multiAck && count($messages) > 1 && $ack) {
337
            $channels = array_map(fn ($message) => $message->getChannel(), $messages);
338
            if (count($channels) !== array_unique($channels)) { // all messages have same channel
339
                throw new InvalidArgumentException('Messages can not be processed as multi ack with different channels');
340
            }
341
342
            $this->channel->basic_ack(last($deliveryTag), true);
0 ignored issues
show
Bug introduced by
The function last was not found. Maybe you did not declare it correctly or list all dependencies? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

342
            $this->channel->basic_ack(/** @scrutinizer ignore-call */ last($deliveryTag), true);
Loading history...
Comprehensibility Best Practice introduced by
The variable $deliveryTag seems to be never defined.
Loading history...
343
            $this->consumed = $this->consumed + count($messages);
344
            $executeCallbackStrategy->onMessageProcessed($message);
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable $message does not exist. Did you maybe mean $messages?
Loading history...
345
346
            return array_combine(
347
                array_map(fn ($message) => $message->getDeliveryTag(), $messages),
348
                array_fill(0, count($messages), ConsumerInterface::MSG_ACK)
349
            );
350
        } else {
351
            foreach ($replies as $deliveryTag => $reply) {
352
                $message = $messages[$deliveryTag] ?? null;
353
                if (null === $message) {
354
                    throw new AMQPRuntimeException(sprintf('Unknown delivery_tag %d!', $deliveryTag));
355
                }
356
357
                $channel = $message->getChannel();
358
                $processFlag = $reply;
359
                if ($processFlag === ConsumerInterface::MSG_REJECT_REQUEUE || false === $processFlag) {
360
                    $channel->basic_reject($deliveryTag, true); // Reject and requeue message to RabbitMQ
361
                } else if ($processFlag === ConsumerInterface::MSG_SINGLE_NACK_REQUEUE) {
362
                    $channel->basic_nack($deliveryTag, false, true); // NACK and requeue message to RabbitMQ
363
                } else if ($processFlag === ConsumerInterface::MSG_REJECT) {
364
                    $channel->basic_reject($deliveryTag, false); // Reject and drop
365
                } else if ($processFlag !== ConsumerInterface::MSG_ACK_SENT) {
366
                    $isRpcCall = $message->has('reply_to') && $message->has('correlation_id');
367
                    if ($isRpcCall) {
368
                        $this->sendRpcReply($message, $reply);
369
                    }
370
371
                    $channel->basic_ack($deliveryTag); // Remove message from queue only if callback return not false
372
                }
373
374
                $this->consumed++;
375
376
                $executeCallbackStrategy->onMessageProcessed($message);
377
            }
378
        }
379
    }
380
381
    protected function sendRpcReply(AMQPMessage $message, $result)
382
    {
383
        if ($result instanceof RpcReponse || $result instanceof RpcResponseException) {
384
            $body = $this->serializer->serialize($result);
385
            $replayMessage = new AMQPMessage($body, [
386
                'content_type' => 'text/plain',
387
                'correlation_id' => $message->get('correlation_id'),
388
            ]);
389
            $message->getChannel()->basic_publish($replayMessage , '', $message->get('reply_to'));
0 ignored issues
show
Bug introduced by
It seems like $message->get('reply_to') can also be of type PhpAmqpLib\Channel\AMQPChannel; however, parameter $routing_key of PhpAmqpLib\Channel\AMQPChannel::basic_publish() does only seem to accept string, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

389
            $message->getChannel()->basic_publish($replayMessage , '', /** @scrutinizer ignore-type */ $message->get('reply_to'));
Loading history...
390
        } else {
391
            $this->logger->error('Rpc call send msg to queue which have not rpc reponse', [
392
                'amqp' => ['message' => $message]
393
            ]);
394
        }
395
    }
396
397
    protected function maybeStopConsumer()
398
    {
399
        if ($this->forceStop || ($this->target && $this->consumed == $this->target)) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $this->target of type integer|null is loosely compared to true; this is ambiguous if the integer can be 0. You might want to explicitly use !== null instead.

In PHP, under loose comparison (like ==, or !=, or switch conditions), values of different types might be equal.

For integer values, zero is a special case, in particular the following results might be unexpected:

0   == false // true
0   == null  // true
123 == false // false
123 == null  // false

// It is often better to use strict comparison
0 === false // false
0 === null  // false
Loading history...
400
            $this->stopConsuming();
401
        }
402
    }
403
404
    public function forceStopConsumer()
405
    {
406
        $this->forceStop = true;
407
    }
408
409
    public function stopConsuming($immedietly = false)
410
    {
411
        if (false === $immedietly) {
412
            foreach ($this->executeCallbackStrategies as $executeCallbackStrategy) {
413
                $executeCallbackStrategy->onStopConsuming();
414
            }
415
        }
416
417
        foreach ($this->consumerTags as $consumerTag) {
418
            $this->channel->basic_cancel($consumerTag, false, true);
419
        }
420
421
        $this->consumerTags = [];
422
    }
423
424
    /**
425
     * @param int $secondsInTheFuture
426
     */
427
    public function setGracefulMaxExecutionDateTimeFromSecondsInTheFuture($secondsInTheFuture)
428
    {
429
        $this->gracefulMaxExecutionDateTime = new \DateTime("+{$secondsInTheFuture} seconds");
430
    }
431
432
    /**
433
     * Choose the timeout wait (in seconds) to use for the $this->getChannel()->wait() method.
434
     */
435
    private function chooseWaitTimeout(): int
436
    {
437
        if ($this->gracefulMaxExecutionDateTime) {
438
            $allowedExecutionSeconds = $this->gracefulMaxExecutionDateTime->getTimestamp() - time();
439
440
            /*
441
             * Respect the idle timeout if it's set and if it's less than
442
             * the remaining allowed execution.
443
             */
444
            $waitTimeout = $this->idleTimeout && $this->idleTimeout < $allowedExecutionSeconds
445
                ? $this->idleTimeout
446
                : $allowedExecutionSeconds;
447
        } else {
448
            $waitTimeout = $this->idleTimeout;
449
        }
450
451
        if (!is_null($this->timeoutWait) && $this->timeoutWait > 0) {
452
            $waitTimeout = min($waitTimeout, $this->timeoutWait);
453
        }
454
        return $waitTimeout;
455
    }
456
}
457