Test Failed
Pull Request — master (#39)
by Aleksandr
05:36
created

Consumer::maybeStopConsumer()   A

Complexity

Conditions 4
Paths 2

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 20

Importance

Changes 0
Metric Value
cc 4
eloc 2
c 0
b 0
f 0
nc 2
nop 0
dl 0
loc 4
ccs 0
cts 0
cp 0
crap 20
rs 10
1
<?php
2
3
namespace OldSound\RabbitMqBundle\RabbitMq;
4
5
use http\Exception\InvalidArgumentException;
6
use OldSound\RabbitMqBundle\Declarations\BatchConsumeOptions;
7
use OldSound\RabbitMqBundle\Declarations\ConsumeOptions;
8
use OldSound\RabbitMqBundle\Declarations\RpcConsumeOptions;
9
use OldSound\RabbitMqBundle\Event\AfterProcessingMessageEvent;
0 ignored issues
show
Bug introduced by
The type OldSound\RabbitMqBundle\...rProcessingMessageEvent was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
10
use OldSound\RabbitMqBundle\Event\AfterProcessingMessagesEvent;
11
use OldSound\RabbitMqBundle\Event\AMQPEvent;
0 ignored issues
show
Bug introduced by
The type OldSound\RabbitMqBundle\Event\AMQPEvent was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
12
use OldSound\RabbitMqBundle\Event\BeforeProcessingMessageEvent;
0 ignored issues
show
Bug introduced by
The type OldSound\RabbitMqBundle\...eProcessingMessageEvent was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
13
use OldSound\RabbitMqBundle\Event\BeforeProcessingMessagesEvent;
14
use OldSound\RabbitMqBundle\Event\OnConsumeEvent;
15
use OldSound\RabbitMqBundle\Event\OnIdleEvent;
16
use OldSound\RabbitMqBundle\EventDispatcherAwareTrait;
17
use OldSound\RabbitMqBundle\ExecuteReceiverStrategy\BatchExecuteReceiverStrategy;
18
use OldSound\RabbitMqBundle\ExecuteReceiverStrategy\ExecuteReceiverStrategyInterface;
19
use OldSound\RabbitMqBundle\ExecuteReceiverStrategy\CallbackReceiverExecutorInterface;
0 ignored issues
show
Bug introduced by
The type OldSound\RabbitMqBundle\...ceiverExecutorInterface was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
20
use OldSound\RabbitMqBundle\ExecuteReceiverStrategy\MessagesProcessorInterface;
0 ignored issues
show
Bug introduced by
The type OldSound\RabbitMqBundle\...sagesProcessorInterface was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
21
use OldSound\RabbitMqBundle\ExecuteReceiverStrategy\SingleExecuteReceiverStrategy;
22
use OldSound\RabbitMqBundle\Logger\ExtraContextLogger;
23
use OldSound\RabbitMqBundle\ReceiverExecutor\BatchReceiverExecutor;
24
use OldSound\RabbitMqBundle\ReceiverExecutor\ReceiverExecutorDecorator;
25
use OldSound\RabbitMqBundle\ReceiverExecutor\ReceiverExecutorInterface;
26
use OldSound\RabbitMqBundle\ReceiverExecutor\ReplyReceiverExecutor;
27
use OldSound\RabbitMqBundle\ReceiverExecutor\SingleReceiverExecutor;
28
use OldSound\RabbitMqBundle\Producer\ProducerInterface;
29
use OldSound\RabbitMqBundle\Receiver\NotReadyReceiveException;
30
use OldSound\RabbitMqBundle\Receiver\ReceiverException;
31
use OldSound\RabbitMqBundle\Receiver\ReceiverInterface;
32
use OldSound\RabbitMqBundle\Receiver\ReplyReceiverInterface;
33
use PhpAmqpLib\Channel\AMQPChannel;
34
use PhpAmqpLib\Exception\AMQPRuntimeException;
35
use PhpAmqpLib\Exception\AMQPTimeoutException;
36
use PhpAmqpLib\Message\AMQPMessage;
37
use Psr\Log\LoggerAwareTrait;
38
use Psr\Log\NullLogger;
39
use Symfony\Component\Serializer\SerializerInterface;
40
41
class Consumer
42
{
43
    use LoggerAwareTrait;
44
    use EventDispatcherAwareTrait;
45
46
    /** @var AMQPChannel */
47
    protected $channel;
48
    /** @var Consuming[] */
49
    protected $consumings = [];
50 32
    /** @var string[] */
51
    protected $consumerTags = [];
52 32
    /** @var int|null */
53
    protected $target;
54
    /** @var int */
55
    protected $consumed = 0;
56
    /** @var bool */
57
    protected $forceStop = false;
58
    /**
59
     * Importrant! If true - then channel can not be used from somewhere else
60
     * @var bool
61
     */
62
    public $multiAck = false;
63
    /**
64 12
     * @var \DateTime|null DateTime after which the consumer will gracefully exit. "Gracefully" means, that
65
     *      any currently running consumption will not be interrupted.
66 12
     */
67
    public $gracefulMaxExecutionDateTime;
68 12
    /** @var int Exit code used, when consumer is closed by the Graceful Max Execution Timeout feature. */
69
    public $gracefulMaxExecutionTimeoutExitCode = 0;
70 12
    /** @var int|null */
71 10
    public $timeoutWait;
72 10
    /** @var int */
73
    public $idleTimeout = 0;
74
    /** @var int */
75
    public $idleTimeoutExitCode;
76
    /** @var \DateTime|null */
77
    public $lastActivityDateTime;
78 10
79
    public function __construct(AMQPChannel $channel)
80 10
    {
81 10
        $this->channel = $channel;
82
        $this->logger = new NullLogger();
83 2
    }
84
85
    public function getChannel(): AMQPChannel
86 8
    {
87
        return $this->channel;
88 8
    }
89 6
90 6
    protected function setup(): Consumer
91 2
    {
92
        foreach($this->consumings as $index => $consuming) {
93
            $this->channel->basic_qos($consuming->options->qosPrefetchSize, $consuming->options->qosPrefetchCount, false);
94 4
95 4
            $options = $consuming->options;
96
            $consuming->consumerTag ? $consuming->consumerTag : sprintf("PHPPROCESS_%s_%s_%s", gethostname(), getmypid(), $index);
97 4
            $consumerTag = $this->channel->basic_consume(
98 4
                $options->queue,
99 2
                $consuming->consumerTag,
100
                $options->noLocal,
101 2
                $options->noAck,
102
                $options->exclusive,
103
                false,
104
                function (AMQPMessage $message) use ($consuming) {
105
                    $consuming->executeReceiverStrategy->onConsumeCallback($message);
106
                    $this->maybeStopConsumer();
107
                });
108 4
109
            $consuming->consumerTag = $consumerTag;
110
        }
111
112
        return $this;
113
    }
114
115
    /**
116
     * @param iterable|ConsumeOptions[] $queueConsumings
117
     */
118
    public function consumeQueues(iterable $queueConsumings)
119
    {
120
        foreach ($queueConsumings as $queueConsuming) {
121
            $this->consumeQueue($queueConsuming);
0 ignored issues
show
Bug introduced by
The call to OldSound\RabbitMqBundle\...onsumer::consumeQueue() has too few arguments starting with receiver. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

121
            $this->/** @scrutinizer ignore-call */ 
122
                   consumeQueue($queueConsuming);

This check compares calls to functions or methods with their respective definitions. If the call has less arguments than are defined, it raises an issue.

If a function is defined several times with a different number of parameters, the check may pick up the wrong definition and report false positives. One codebase where this has been known to happen is Wordpress. Please note the @ignore annotation hint above.

Loading history...
122
        }
123
    }
124
125
    private function createStrategyByOptions(ConsumeOptions $consumeOptions): ExecuteReceiverStrategyInterface
126
    {
127 32
        if ($consumeOptions instanceof BatchConsumeOptions) {
128
            return new BatchExecuteReceiverStrategy($consumeOptions->batchCount);
129 32
        }
130 32
        return new SingleExecuteReceiverStrategy();
131
    }
132
133 32
    private function createReceiverExecutorByOptions(ConsumeOptions $consumeOptions): ReceiverExecutorInterface
134 32
    {
135 32
        if ($consumeOptions instanceof BatchConsumeOptions) {
136 32
            $receiverExecutor = new BatchReceiverExecutor();
137 32
        } else if ($consumeOptions instanceof RpcConsumeOptions) {
138
            $receiverExecutor = new ReplyReceiverExecutor($consumeOptions);
139 32
        } else {
140
            $receiverExecutor = new SingleReceiverExecutor();
141 32
        }
142 32
143 32
        return $receiverExecutor;
144
    }
145
146
    public function consumeQueue(ConsumeOptions $consumerOptions, $receiver): Consumer
147
    {
148
        $executeReceiverStrategy = $this->createStrategyByOptions($consumerOptions);
149
        $receiverExecutor = $this->createReceiverExecutorByOptions($consumerOptions);
150
        if (!$receiverExecutor->support($receiver)) {
151
            throw new \InvalidArgumentException('sdfs');
152
        }
153
154
        $consuming = new Consuming($consumerOptions, $executeReceiverStrategy, $receiverExecutor, $receiver);
155
        $executeReceiverStrategy->setMessagesProcessor(
156
            new ReceiverExecutorDecorator($consuming, $this->logger)
157
        );
158
159
        $this->consumings[] = $consuming;
160
161
        return $this;
162
    }
163
164
    /**
165
     * @return ConsumeOptions[]
166
     */
167
    public function getConsumings(): array
168
    {
169
        return $this->consumings;
0 ignored issues
show
Bug Best Practice introduced by
The expression return $this->consumings returns the type OldSound\RabbitMqBundle\RabbitMq\Consuming[] which is incompatible with the documented return type OldSound\RabbitMqBundle\...ations\ConsumeOptions[].
Loading history...
170
    }
171
172
    /**
173
     * Consume the message
174
     * @param   int     $msgAmount
175 32
     * @return  int
176
     *
177 14
     * @throws  AMQPTimeoutException
178
     */
179 14
    public function startConsume(int $msgAmount = null)
180 14
    {
181
        $this->target = $msgAmount;
182 32
        $this->consumed = 0;
183
184 32
        $this->setup();
185
186 10
        $this->lastActivityDateTime = new \DateTime();
187 22
        while ($this->channel->is_consuming()) {
188
            $this->dispatchEvent(OnConsumeEvent::NAME, new OnConsumeEvent($this));
189
            $this->maybeStopConsumer();
190 22
191
            if ($this->forceStop) {
192 5
                break;
193 17
            }
194
            /*
195 15
             * Be careful not to trigger ::wait() with 0 or less seconds, when
196
             * graceful max execution timeout is being used.
197
             */
198 32
            $waitTimeout = $this->chooseWaitTimeout();
199 32
            if ($this->gracefulMaxExecutionDateTime && $waitTimeout < 1) {
200
                return $this->gracefulMaxExecutionTimeoutExitCode;
201 32
            }
202
203
            try {
204 32
                $this->channel->wait(null, false, $waitTimeout);
205
                $this->lastActivityDateTime = new \DateTime();
206
                if ($this->forceStop) {
207
                    break;
208
                }
209
            } catch (AMQPTimeoutException $e) {
210
                foreach($this->executeReceiverStrategies as $executeReceiverStrategy) {
0 ignored issues
show
Bug Best Practice introduced by
The property executeReceiverStrategies does not exist on OldSound\RabbitMqBundle\RabbitMq\Consumer. Did you maybe forget to declare it?
Loading history...
211
                    $executeReceiverStrategy->onCatchTimeout($e);
212
                }
213
                $now = new \DateTime();
214
                if ($this->gracefulMaxExecutionDateTime && $this->gracefulMaxExecutionDateTime <= $now) {
215
                    return $this->gracefulMaxExecutionTimeoutExitCode;
216
                }
217
218
                if ($this->idleTimeout && ($this->lastActivityDateTime->getTimestamp() + $this->idleTimeout <= $now->getTimestamp())) {
219
                    $idleEvent = new OnIdleEvent($this);
220
                    $this->dispatchEvent(OnIdleEvent::NAME, $idleEvent);
221 4
222
                    if ($idleEvent->isForceStop()) {
223 4
                        if (null !== $this->idleTimeoutExitCode) {
224 4
                            return $this->idleTimeoutExitCode;
225
                        } else {
226
                            throw $e;
227
                        }
228
                    }
229 4
                }
230
            }
231 4
        }
232 4
233
        return 0;
234
    }
235
236
    /**
237 2
     * @param AMQPMessage[] $messages
238
     * @param ConsumeOptions $queueConsuming
239 2
     */
240 2
    protected function processMessages(array $messages, ConsumeOptions $queueConsuming)
241
    {
242
        if (count($messages) === 0) {
243
            throw new \InvalidArgumentException('Messages can not be empty');
244
        }
245
246
        $canPrecessMultiMessages = $this->getExecuteReceiverStrategy($queueConsuming)->canPrecessMultiMessages();
0 ignored issues
show
Bug introduced by
The method getExecuteReceiverStrategy() does not exist on OldSound\RabbitMqBundle\RabbitMq\Consumer. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

246
        $canPrecessMultiMessages = $this->/** @scrutinizer ignore-call */ getExecuteReceiverStrategy($queueConsuming)->canPrecessMultiMessages();

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
247
        if (!$canPrecessMultiMessages && count($messages) !== 1) {
248
            throw new \InvalidArgumentException('Strategy is not supported process of multi messages');
249
        }
250
251
        /** @var int[]|int $flags */
252
        $flags = [];
253
        try {
254
            if ($queueConsuming->receiver instanceof ReceiverInterface) {
255
                $flags = $queueConsuming->receiver->execute($messages[0]);
256
            } else if ($queueConsuming->receiver instanceof BatchReceiverInterface) {
0 ignored issues
show
Bug introduced by
The type OldSound\RabbitMqBundle\...\BatchReceiverInterface was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
257
                $flags = $queueConsuming->receiver->batchExecute($messages);
258
            } else if ($queueConsuming->receiver instanceof ReplyReceiverInterface) {
259
                $reply = $queueConsuming->receiver->execute($messages[0]);
260
                $isRpcCall = $messages[0]->has('reply_to') && $messages[0]->has('correlation_id');
261
                if ($isRpcCall) {
262
                    $this->sendRpcReply($messages[0], $reply);
0 ignored issues
show
Bug introduced by
The method sendRpcReply() does not exist on OldSound\RabbitMqBundle\RabbitMq\Consumer. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

262
                    $this->/** @scrutinizer ignore-call */ 
263
                           sendRpcReply($messages[0], $reply);

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
263
                    $flags = ReceiverInterface::MSG_ACK;
264
                } else {
265
                    $flags = ReceiverInterface::MSG_REJECT;
266
                    // logging
267 10
                }
268
            } else {
269 10
                throw new \InvalidArgumentException('TODO');
270 4
            }
271 4
        } catch (ReceiverException $exception) {
272 4
            $flags = $exception->getCode();
273 4
        } catch (NotReadyReceiveException $exception) {
274 4
            // TODO
275
            $this->forceStop = true;
276 4
            return;
277 2
        }
278
279
        if (!is_array($flags)) { // spread handle flag for each delivery tag
280
            $flag = $flags;
281
            $flags = [];
282
            foreach ($messages as $message) {
283
                $flags[$message->getDeliveryTag()] = $flag;
284
            }
285 4
        } else if (count($flags) !== count($messages)) {
286 4
            throw new AMQPRuntimeException(
287
                'Method batchExecute() should return an array with elements equal with the number of messages processed'
288
            );
289
        }
290
291
        if (!$queueConsuming->noAck) {
292
            $messages = array_combine(
293
                array_map(fn ($message) => $message->getDeliveryTag(), $messages),
294
                $messages
295 4
            );
296 4
297
            $this->handleProcessMessages($messages, $flags, $queueConsuming);
0 ignored issues
show
Bug introduced by
It seems like $messages can also be of type false; however, parameter $messages of OldSound\RabbitMqBundle\...handleProcessMessages() does only seem to accept PhpAmqpLib\Message\AMQPMessage[], maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

297
            $this->handleProcessMessages(/** @scrutinizer ignore-type */ $messages, $flags, $queueConsuming);
Loading history...
298
        }
299
    }
300
301 6
    /**
302 6
     * @param AMQPMessage[] $messages
303
     * @param int[]|RpcReponse[]|RpcResponseException[]|bool[] $replies
304
     */
305
    private function handleProcessMessages($messages, array $replies, ConsumeOptions $queueConsuming)
306
    {
307
        $executeReceiverStrategy = $this->getExecuteReceiverStrategy($queueConsuming);
308
309
        $ack = !array_search(fn ($reply) => $reply !== ReceiverInterface::MSG_ACK, $replies, true);
310
        if ($this->multiAck && count($messages) > 1 && $ack) {
311
            $channels = array_map(fn ($message) => $message->getChannel(), $messages);
312
            if (count($channels) !== array_unique($channels)) { // all messages have same channel
313
                throw new InvalidArgumentException('Messages can not be processed as multi ack with different channels');
314
            }
315
316
            $lastDeliveryTag = array_key_last($replies);
317
318
            $this->channel->basic_ack($lastDeliveryTag, true);
319
            $this->consumed = $this->consumed + count($messages);
320
            foreach ($messages as $message) {
321
                $executeReceiverStrategy->onMessageProcessed($message);
322
            }
323
        } else {
324
            foreach ($replies as $deliveryTag => $reply) {
325
                $message = $messages[$deliveryTag] ?? null;
326
                if (null === $message) {
327
                    throw new AMQPRuntimeException(sprintf('Unknown delivery_tag %d!', $deliveryTag));
328
                }
329
330
                $channel = $message->getChannel();
331
                $processFlag = $reply;
332
                if ($processFlag === ReceiverInterface::MSG_REJECT_REQUEUE || false === $processFlag) {
333
                    $channel->basic_reject($deliveryTag, true); // Reject and requeue message to RabbitMQ
334
                } else if ($processFlag === ReceiverInterface::MSG_SINGLE_NACK_REQUEUE) {
335
                    $channel->basic_nack($deliveryTag, false, true); // NACK and requeue message to RabbitMQ
336
                } else if ($processFlag === ReceiverInterface::MSG_REJECT) {
337
                    $channel->basic_reject($deliveryTag, false); // Reject and drop
338
                } else if ($processFlag !== ReceiverInterface::MSG_ACK_SENT) {
339
                    $channel->basic_ack($deliveryTag); // Remove message from queue only if callback return not false
340
                }
341
342
                $this->consumed++;
343
344
                $executeReceiverStrategy->onMessageProcessed($message);
345
            }
346
        }
347
    }
348
349
    protected function maybeStopConsumer()
350
    {
351
        if ($this->forceStop || ($this->target && $this->consumed == $this->target)) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $this->target of type integer|null is loosely compared to true; this is ambiguous if the integer can be 0. You might want to explicitly use !== null instead.

In PHP, under loose comparison (like ==, or !=, or switch conditions), values of different types might be equal.

For integer values, zero is a special case, in particular the following results might be unexpected:

0   == false // true
0   == null  // true
123 == false // false
123 == null  // false

// It is often better to use strict comparison
0 === false // false
0 === null  // false
Loading history...
352
            $this->stopConsuming();
353
        }
354
    }
355
356
    public function forceStopConsumer()
357
    {
358
        $this->forceStop = true;
359
    }
360
361
    public function stopConsuming($immediately = false)
362
    {
363
        if (false === $immediately) {
364
            foreach ($this->executeReceiverStrategies as $executeReceiverStrategy) {
0 ignored issues
show
Bug Best Practice introduced by
The property executeReceiverStrategies does not exist on OldSound\RabbitMqBundle\RabbitMq\Consumer. Did you maybe forget to declare it?
Loading history...
365
                $executeReceiverStrategy->onStopConsuming();
366
            }
367
        }
368
369
        foreach ($this->consumerTags as $consumerTag) {
370
            $this->channel->basic_cancel($consumerTag, false, true);
371
        }
372
373
        $this->consumerTags = [];
374
    }
375
376
    /**
377
     * @param int $secondsInTheFuture
378
     */
379
    public function setGracefulMaxExecutionDateTimeFromSecondsInTheFuture($secondsInTheFuture)
380
    {
381
        $this->gracefulMaxExecutionDateTime = new \DateTime("+{$secondsInTheFuture} seconds");
382
    }
383
384
    /**
385
     * Choose the timeout wait (in seconds) to use for the $this->getChannel()->wait() method.
386
     */
387
    private function chooseWaitTimeout(): int
388
    {
389
        if ($this->gracefulMaxExecutionDateTime) {
390
            $allowedExecutionSeconds = $this->gracefulMaxExecutionDateTime->getTimestamp() - time();
391
392
            /*
393
             * Respect the idle timeout if it's set and if it's less than
394
             * the remaining allowed execution.
395
             */
396
            $waitTimeout = $this->idleTimeout && $this->idleTimeout < $allowedExecutionSeconds
397
                ? $this->idleTimeout
398
                : $allowedExecutionSeconds;
399
        } else {
400
            $waitTimeout = $this->idleTimeout;
401
        }
402
403
        if (!is_null($this->timeoutWait) && $this->timeoutWait > 0) {
404
            $waitTimeout = min($waitTimeout, $this->timeoutWait);
405
        }
406
        return $waitTimeout;
407
    }
408
}
409