Test Failed
Pull Request — master (#39)
by Aleksandr
08:40
created

Consumer::startConsume()   C

Complexity

Conditions 14
Paths 17

Size

Total Lines 55
Code Lines 32

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 19
CRAP Score 16.7094

Importance

Changes 0
Metric Value
cc 14
eloc 32
c 0
b 0
f 0
nc 17
nop 1
dl 0
loc 55
ccs 19
cts 25
cp 0.76
crap 16.7094
rs 6.2666

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
namespace OldSound\RabbitMqBundle\RabbitMq;
4
5
use OldSound\RabbitMqBundle\Declarations\BatchConsumeOptions;
6
use OldSound\RabbitMqBundle\Declarations\ConsumeOptions;
7
use OldSound\RabbitMqBundle\Declarations\RpcConsumeOptions;
8
use OldSound\RabbitMqBundle\Event\OnConsumeEvent;
9
use OldSound\RabbitMqBundle\Event\OnIdleEvent;
10
use OldSound\RabbitMqBundle\EventDispatcherAwareTrait;
11
use OldSound\RabbitMqBundle\ExecuteReceiverStrategy\BatchExecuteReceiverStrategy;
12
use OldSound\RabbitMqBundle\ExecuteReceiverStrategy\SingleExecuteReceiverStrategy;
13
use OldSound\RabbitMqBundle\ReceiverExecutor\BatchReceiverExecutor;
14
use OldSound\RabbitMqBundle\ReceiverExecutor\ReceiverExecutorDecorator;
15
use OldSound\RabbitMqBundle\ReceiverExecutor\ReceiverExecutorInterface;
16
use OldSound\RabbitMqBundle\ReceiverExecutor\ReplyReceiverExecutor;
17
use OldSound\RabbitMqBundle\ReceiverExecutor\SingleReceiverExecutor;
18
use OldSound\RabbitMqBundle\Receiver\ReceiverInterface;
19
use PhpAmqpLib\Channel\AMQPChannel;
20
use PhpAmqpLib\Exception\AMQPRuntimeException;
21
use PhpAmqpLib\Exception\AMQPTimeoutException;
22
use PhpAmqpLib\Message\AMQPMessage;
23
use Psr\Log\LoggerAwareTrait;
24
use Psr\Log\NullLogger;
25
use Symfony\Component\Serializer\SerializerInterface;
26
27
class Consumer
28
{
29
    use LoggerAwareTrait;
30
    use EventDispatcherAwareTrait;
31
32
    /** @var AMQPChannel */
33
    protected $channel;
34
    /** @var Consuming[] */
35
    protected $consumings = [];
36
    /** @var string[] */
37
    protected $consumerTags = [];
38
    /** @var int|null */
39
    protected $target;
40
    /** @var int */
41
    protected $consumed = 0;
42
    /** @var bool */
43
    protected $forceStop = false;
44
    /**
45
     * Importrant! If true - then channel can not be used from somewhere else
46
     * @var bool
47
     */
48
    public $multiAck = false;
49
    /**
50 32
     * @var \DateTime|null DateTime after which the consumer will gracefully exit. "Gracefully" means, that
51
     *      any currently running consumption will not be interrupted.
52 32
     */
53
    public $gracefulMaxExecutionDateTime;
54
    /** @var int Exit code used, when consumer is closed by the Graceful Max Execution Timeout feature. */
55
    public $gracefulMaxExecutionTimeoutExitCode = 0;
56
    /** @var int|null */
57
    public $timeoutWait;
58
    /** @var int */
59
    public $idleTimeout = 0;
60
    /** @var int */
61
    public $idleTimeoutExitCode;
62
    /** @var \DateTime|null */
63
    public $lastActivityDateTime;
64 12
65
    public function __construct(AMQPChannel $channel)
66 12
    {
67
        $this->channel = $channel;
68 12
        $this->logger = new NullLogger();
69
    }
70 12
71 10
    public function getChannel(): AMQPChannel
72 10
    {
73
        return $this->channel;
74
    }
75
76
    protected function setup(): Consumer
77
    {
78 10
        foreach($this->consumings as $index => $consuming) {
79
            $this->channel->basic_qos($consuming->options->qosPrefetchSize, $consuming->options->qosPrefetchCount, false);
80 10
81 10
            $options = $consuming->options;
82
            $consuming->consumerTag ? $consuming->consumerTag : sprintf("PHPPROCESS_%s_%s_%s", gethostname(), getmypid(), $index);
83 2
            $consumerTag = $this->channel->basic_consume(
84
                $options->queue,
85
                $consuming->consumerTag,
86 8
                $options->noLocal,
87
                $options->noAck,
88 8
                $options->exclusive,
89 6
                false,
90 6
                function (AMQPMessage $message) use ($consuming) {
91 2
                    $flags = $consuming->executeReceiverStrategy->onConsumeCallback($message);
92
                    if ($flags) {
93
                        $this->handleProcessMessages($flags, $consuming);
94 4
                        foreach ($messages as $message) {
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable $messages does not exist. Did you maybe mean $message?
Loading history...
95 4
                            $executeReceiverStrategy->onMessageProcessed($message);
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable $executeReceiverStrategy seems to be never defined.
Loading history...
96
                        }
97 4
                    }
98 4
99 2
                    $this->maybeStopConsumer();
100
                });
101 2
102
            $consuming->consumerTag = $consumerTag;
103
        }
104
105
        return $this;
106
    }
107
108 4
    /**
109
     * @param iterable|ConsumeOptions[] $queueConsumings
110
     */
111
    public function consumeQueues(iterable $queueConsumings)
112
    {
113
        foreach ($queueConsumings as $queueConsuming) {
114
            $this->consumeQueue($queueConsuming);
0 ignored issues
show
Bug introduced by
The call to OldSound\RabbitMqBundle\...onsumer::consumeQueue() has too few arguments starting with receiver. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

114
            $this->/** @scrutinizer ignore-call */ 
115
                   consumeQueue($queueConsuming);

This check compares calls to functions or methods with their respective definitions. If the call has less arguments than are defined, it raises an issue.

If a function is defined several times with a different number of parameters, the check may pick up the wrong definition and report false positives. One codebase where this has been known to happen is Wordpress. Please note the @ignore annotation hint above.

Loading history...
115
        }
116
    }
117
118
    private function createStrategyByOptions(ConsumeOptions $consumeOptions): ExecuteReceiverStrategyInterface
0 ignored issues
show
Bug introduced by
The type OldSound\RabbitMqBundle\...ceiverStrategyInterface was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
119
    {
120
        if ($consumeOptions instanceof BatchConsumeOptions) {
121
            return new BatchExecuteReceiverStrategy($consumeOptions->batchCount);
0 ignored issues
show
Bug Best Practice introduced by
The expression return new OldSound\Rabb...umeOptions->batchCount) returns the type OldSound\RabbitMqBundle\...ExecuteReceiverStrategy which is incompatible with the type-hinted return OldSound\RabbitMqBundle\...ceiverStrategyInterface.
Loading history...
122
        }
123
        return new SingleExecuteReceiverStrategy();
0 ignored issues
show
Bug Best Practice introduced by
The expression return new OldSound\Rabb...ecuteReceiverStrategy() returns the type OldSound\RabbitMqBundle\...ExecuteReceiverStrategy which is incompatible with the type-hinted return OldSound\RabbitMqBundle\...ceiverStrategyInterface.
Loading history...
124
    }
125
126
    private function createReceiverExecutorByOptions(ConsumeOptions $consumeOptions): ReceiverExecutorInterface
127 32
    {
128
        if ($consumeOptions instanceof BatchConsumeOptions) {
129 32
            $receiverExecutor = new BatchReceiverExecutor();
130 32
        } else if ($consumeOptions instanceof RpcConsumeOptions) {
131
            $receiverExecutor = new ReplyReceiverExecutor($consumeOptions);
132
        } else {
133 32
            $receiverExecutor = new SingleReceiverExecutor();
134 32
        }
135 32
136 32
        return $receiverExecutor;
137 32
    }
138
139 32
    public function consumeQueue(ConsumeOptions $consumerOptions, $receiver): Consumer
140
    {
141 32
        $executeReceiverStrategy = $this->createStrategyByOptions($consumerOptions);
142 32
        $receiverExecutor = $this->createReceiverExecutorByOptions($consumerOptions);
143 32
        if (!$receiverExecutor->support($receiver)) {
144
            throw new \InvalidArgumentException('sdfs');
145
        }
146
147
        $consuming = new Consuming($consumerOptions, $executeReceiverStrategy, $receiverExecutor, $receiver);
148
        $executeReceiverStrategy->setReceiverExecutor(
149
            new ReceiverExecutorDecorator($consuming, $this->logger)
150
        );
151
152
        $this->consumings[] = $consuming;
153
154
        return $this;
155
    }
156
157
    /**
158
     * @return ConsumeOptions[]
159
     */
160
    public function getConsumings(): array
161
    {
162
        return $this->consumings;
0 ignored issues
show
Bug Best Practice introduced by
The expression return $this->consumings returns the type OldSound\RabbitMqBundle\RabbitMq\Consuming[] which is incompatible with the documented return type OldSound\RabbitMqBundle\...ations\ConsumeOptions[].
Loading history...
163
    }
164
165
    /**
166
     * Consume the message
167
     * @param   int     $msgAmount
168
     * @return  int
169
     *
170
     * @throws  AMQPTimeoutException
171
     */
172
    public function startConsume(int $msgAmount = null)
173
    {
174
        $this->target = $msgAmount;
175 32
        $this->consumed = 0;
176
177 14
        $this->setup();
178
179 14
        $this->lastActivityDateTime = new \DateTime();
180 14
        while ($this->channel->is_consuming()) {
181
            $this->dispatchEvent(OnConsumeEvent::NAME, new OnConsumeEvent($this));
182 32
            $this->maybeStopConsumer();
183
184 32
            if ($this->forceStop) {
185
                break;
186 10
            }
187 22
            /*
188
             * Be careful not to trigger ::wait() with 0 or less seconds, when
189
             * graceful max execution timeout is being used.
190 22
             */
191
            $waitTimeout = $this->chooseWaitTimeout();
192 5
            if ($this->gracefulMaxExecutionDateTime && $waitTimeout < 1) {
193 17
                return $this->gracefulMaxExecutionTimeoutExitCode;
194
            }
195 15
196
            try {
197
                $this->channel->wait(null, false, $waitTimeout);
198 32
                $this->lastActivityDateTime = new \DateTime();
199 32
                if ($this->forceStop) {
200
                    break;
201 32
                }
202
            } catch (AMQPTimeoutException $e) {
203
                foreach($this->executeReceiverStrategies as $executeReceiverStrategy) {
0 ignored issues
show
Bug Best Practice introduced by
The property executeReceiverStrategies does not exist on OldSound\RabbitMqBundle\RabbitMq\Consumer. Did you maybe forget to declare it?
Loading history...
204 32
                    $executeReceiverStrategy->onCatchTimeout($e);
205
                }
206
                $now = new \DateTime();
207
                if ($this->gracefulMaxExecutionDateTime && $this->gracefulMaxExecutionDateTime <= $now) {
208
                    return $this->gracefulMaxExecutionTimeoutExitCode;
209
                }
210
211
                if ($this->idleTimeout && ($this->lastActivityDateTime->getTimestamp() + $this->idleTimeout <= $now->getTimestamp())) {
212
                    $idleEvent = new OnIdleEvent($this);
213
                    $this->dispatchEvent(OnIdleEvent::NAME, $idleEvent);
214
215
                    if ($idleEvent->isForceStop()) {
216
                        if (null !== $this->idleTimeoutExitCode) {
217
                            return $this->idleTimeoutExitCode;
218
                        } else {
219
                            throw $e;
220
                        }
221 4
                    }
222
                }
223 4
            }
224 4
        }
225
226
        return 0;
227
    }
228
229 4
    private function handleProcessMessages(array $flags, Consuming $consuming)
0 ignored issues
show
Unused Code introduced by
The parameter $consuming is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

229
    private function handleProcessMessages(array $flags, /** @scrutinizer ignore-unused */ Consuming $consuming)

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
230
    {
231 4
        $ack = !array_search(fn ($reply) => $reply !== ReceiverInterface::MSG_ACK, $flags, true);
232 4
        if ($this->multiAck && count($flags) > 1 && $ack) {
233
            $lastDeliveryTag = array_key_last($flags);
234
235
            $this->channel->basic_ack($lastDeliveryTag, true);
236
            $this->consumed = $this->consumed + count($flags);
237 2
        } else {
238
            foreach ($flags as $deliveryTag => $flag) {
239 2
                if ($flag === ReceiverInterface::MSG_REJECT_REQUEUE) {
240 2
                    $this->channel->basic_reject($deliveryTag, true); // Reject and requeue message to RabbitMQ
241
                } else if ($flag === ReceiverInterface::MSG_SINGLE_NACK_REQUEUE) {
242
                    $this->channel->basic_nack($deliveryTag, false, true); // NACK and requeue message to RabbitMQ
243
                } else if ($flag === ReceiverInterface::MSG_REJECT) {
244
                    $this->channel->basic_reject($deliveryTag, false); // Reject and drop
245
                } else if ($flag !== ReceiverInterface::MSG_ACK_SENT) {
246
                    $this->channel->basic_ack($deliveryTag); // Remove message from queue only if callback return not false
247
                } else {
248
                    // TODO throw..
249
                }
250
251
                $this->consumed++;
252
            }
253
        }
254
    }
255
256
    protected function maybeStopConsumer()
257
    {
258
        if ($this->forceStop || ($this->target && $this->consumed == $this->target)) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $this->target of type integer|null is loosely compared to true; this is ambiguous if the integer can be 0. You might want to explicitly use !== null instead.

In PHP, under loose comparison (like ==, or !=, or switch conditions), values of different types might be equal.

For integer values, zero is a special case, in particular the following results might be unexpected:

0   == false // true
0   == null  // true
123 == false // false
123 == null  // false

// It is often better to use strict comparison
0 === false // false
0 === null  // false
Loading history...
259
            $this->stopConsuming();
260
        }
261
    }
262
263
    public function forceStopConsumer()
264
    {
265
        $this->forceStop = true;
266
    }
267 10
268
    public function stopConsuming($immediately = false)
269 10
    {
270 4
        if (false === $immediately) {
271 4
            foreach ($this->executeReceiverStrategies as $executeReceiverStrategy) {
0 ignored issues
show
Bug Best Practice introduced by
The property executeReceiverStrategies does not exist on OldSound\RabbitMqBundle\RabbitMq\Consumer. Did you maybe forget to declare it?
Loading history...
272 4
                $executeReceiverStrategy->onStopConsuming();
273 4
            }
274 4
        }
275
276 4
        foreach ($this->consumerTags as $consumerTag) {
277 2
            $this->channel->basic_cancel($consumerTag, false, true);
278
        }
279
280
        $this->consumerTags = [];
281
    }
282
283
    /**
284
     * @param int $secondsInTheFuture
285 4
     */
286 4
    public function setGracefulMaxExecutionDateTimeFromSecondsInTheFuture($secondsInTheFuture)
287
    {
288
        $this->gracefulMaxExecutionDateTime = new \DateTime("+{$secondsInTheFuture} seconds");
289
    }
290
291
    /**
292
     * Choose the timeout wait (in seconds) to use for the $this->getChannel()->wait() method.
293
     */
294
    private function chooseWaitTimeout(): int
295 4
    {
296 4
        if ($this->gracefulMaxExecutionDateTime) {
297
            $allowedExecutionSeconds = $this->gracefulMaxExecutionDateTime->getTimestamp() - time();
298
299
            /*
300
             * Respect the idle timeout if it's set and if it's less than
301 6
             * the remaining allowed execution.
302 6
             */
303
            $waitTimeout = $this->idleTimeout && $this->idleTimeout < $allowedExecutionSeconds
304
                ? $this->idleTimeout
305
                : $allowedExecutionSeconds;
306
        } else {
307
            $waitTimeout = $this->idleTimeout;
308
        }
309
310
        if (!is_null($this->timeoutWait) && $this->timeoutWait > 0) {
311
            $waitTimeout = min($waitTimeout, $this->timeoutWait);
312
        }
313
        return $waitTimeout;
314
    }
315
}
316