Test Failed
Pull Request — master (#39)
by Aleksandr
08:40
created

Consumer   C

Complexity

Total Complexity 56

Size/Duplication

Total Lines 287
Duplicated Lines 0 %

Test Coverage

Coverage 66.09%

Importance

Changes 22
Bugs 0 Features 3
Metric Value
eloc 130
c 22
b 0
f 3
dl 0
loc 287
ccs 76
cts 115
cp 0.6609
rs 5.5199
wmc 56

15 Methods

Rating   Name   Duplication   Size   Complexity  
A consumeQueue() 0 16 2
A setGracefulMaxExecutionDateTimeFromSecondsInTheFuture() 0 3 1
A __construct() 0 4 1
B handleProcessMessages() 0 23 9
A consumeQueues() 0 4 2
A getChannel() 0 3 1
A getConsumings() 0 3 1
A setup() 0 30 5
A createStrategyByOptions() 0 6 2
C startConsume() 0 55 14
A maybeStopConsumer() 0 4 4
A forceStopConsumer() 0 3 1
A stopConsuming() 0 13 4
A createReceiverExecutorByOptions() 0 11 3
A chooseWaitTimeout() 0 20 6

How to fix   Complexity   

Complex Class

Complex classes like Consumer often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use Consumer, and based on these observations, apply Extract Interface, too.

1
<?php
2
3
namespace OldSound\RabbitMqBundle\RabbitMq;
4
5
use OldSound\RabbitMqBundle\Declarations\BatchConsumeOptions;
6
use OldSound\RabbitMqBundle\Declarations\ConsumeOptions;
7
use OldSound\RabbitMqBundle\Declarations\RpcConsumeOptions;
8
use OldSound\RabbitMqBundle\Event\OnConsumeEvent;
9
use OldSound\RabbitMqBundle\Event\OnIdleEvent;
10
use OldSound\RabbitMqBundle\EventDispatcherAwareTrait;
11
use OldSound\RabbitMqBundle\ExecuteReceiverStrategy\BatchExecuteReceiverStrategy;
12
use OldSound\RabbitMqBundle\ExecuteReceiverStrategy\SingleExecuteReceiverStrategy;
13
use OldSound\RabbitMqBundle\ReceiverExecutor\BatchReceiverExecutor;
14
use OldSound\RabbitMqBundle\ReceiverExecutor\ReceiverExecutorDecorator;
15
use OldSound\RabbitMqBundle\ReceiverExecutor\ReceiverExecutorInterface;
16
use OldSound\RabbitMqBundle\ReceiverExecutor\ReplyReceiverExecutor;
17
use OldSound\RabbitMqBundle\ReceiverExecutor\SingleReceiverExecutor;
18
use OldSound\RabbitMqBundle\Receiver\ReceiverInterface;
19
use PhpAmqpLib\Channel\AMQPChannel;
20
use PhpAmqpLib\Exception\AMQPRuntimeException;
21
use PhpAmqpLib\Exception\AMQPTimeoutException;
22
use PhpAmqpLib\Message\AMQPMessage;
23
use Psr\Log\LoggerAwareTrait;
24
use Psr\Log\NullLogger;
25
use Symfony\Component\Serializer\SerializerInterface;
26
27
class Consumer
28
{
29
    use LoggerAwareTrait;
30
    use EventDispatcherAwareTrait;
31
32
    /** @var AMQPChannel */
33
    protected $channel;
34
    /** @var Consuming[] */
35
    protected $consumings = [];
36
    /** @var string[] */
37
    protected $consumerTags = [];
38
    /** @var int|null */
39
    protected $target;
40
    /** @var int */
41
    protected $consumed = 0;
42
    /** @var bool */
43
    protected $forceStop = false;
44
    /**
45
     * Importrant! If true - then channel can not be used from somewhere else
46
     * @var bool
47
     */
48
    public $multiAck = false;
49
    /**
50 32
     * @var \DateTime|null DateTime after which the consumer will gracefully exit. "Gracefully" means, that
51
     *      any currently running consumption will not be interrupted.
52 32
     */
53
    public $gracefulMaxExecutionDateTime;
54
    /** @var int Exit code used, when consumer is closed by the Graceful Max Execution Timeout feature. */
55
    public $gracefulMaxExecutionTimeoutExitCode = 0;
56
    /** @var int|null */
57
    public $timeoutWait;
58
    /** @var int */
59
    public $idleTimeout = 0;
60
    /** @var int */
61
    public $idleTimeoutExitCode;
62
    /** @var \DateTime|null */
63
    public $lastActivityDateTime;
64 12
65
    public function __construct(AMQPChannel $channel)
66 12
    {
67
        $this->channel = $channel;
68 12
        $this->logger = new NullLogger();
69
    }
70 12
71 10
    public function getChannel(): AMQPChannel
72 10
    {
73
        return $this->channel;
74
    }
75
76
    protected function setup(): Consumer
77
    {
78 10
        foreach($this->consumings as $index => $consuming) {
79
            $this->channel->basic_qos($consuming->options->qosPrefetchSize, $consuming->options->qosPrefetchCount, false);
80 10
81 10
            $options = $consuming->options;
82
            $consuming->consumerTag ? $consuming->consumerTag : sprintf("PHPPROCESS_%s_%s_%s", gethostname(), getmypid(), $index);
83 2
            $consumerTag = $this->channel->basic_consume(
84
                $options->queue,
85
                $consuming->consumerTag,
86 8
                $options->noLocal,
87
                $options->noAck,
88 8
                $options->exclusive,
89 6
                false,
90 6
                function (AMQPMessage $message) use ($consuming) {
91 2
                    $flags = $consuming->executeReceiverStrategy->onConsumeCallback($message);
92
                    if ($flags) {
93
                        $this->handleProcessMessages($flags, $consuming);
94 4
                        foreach ($messages as $message) {
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable $messages does not exist. Did you maybe mean $message?
Loading history...
95 4
                            $executeReceiverStrategy->onMessageProcessed($message);
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable $executeReceiverStrategy seems to be never defined.
Loading history...
96
                        }
97 4
                    }
98 4
99 2
                    $this->maybeStopConsumer();
100
                });
101 2
102
            $consuming->consumerTag = $consumerTag;
103
        }
104
105
        return $this;
106
    }
107
108 4
    /**
109
     * @param iterable|ConsumeOptions[] $queueConsumings
110
     */
111
    public function consumeQueues(iterable $queueConsumings)
112
    {
113
        foreach ($queueConsumings as $queueConsuming) {
114
            $this->consumeQueue($queueConsuming);
0 ignored issues
show
Bug introduced by
The call to OldSound\RabbitMqBundle\...onsumer::consumeQueue() has too few arguments starting with receiver. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

114
            $this->/** @scrutinizer ignore-call */ 
115
                   consumeQueue($queueConsuming);

This check compares calls to functions or methods with their respective definitions. If the call has less arguments than are defined, it raises an issue.

If a function is defined several times with a different number of parameters, the check may pick up the wrong definition and report false positives. One codebase where this has been known to happen is Wordpress. Please note the @ignore annotation hint above.

Loading history...
115
        }
116
    }
117
118
    private function createStrategyByOptions(ConsumeOptions $consumeOptions): ExecuteReceiverStrategyInterface
0 ignored issues
show
Bug introduced by
The type OldSound\RabbitMqBundle\...ceiverStrategyInterface was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
119
    {
120
        if ($consumeOptions instanceof BatchConsumeOptions) {
121
            return new BatchExecuteReceiverStrategy($consumeOptions->batchCount);
0 ignored issues
show
Bug Best Practice introduced by
The expression return new OldSound\Rabb...umeOptions->batchCount) returns the type OldSound\RabbitMqBundle\...ExecuteReceiverStrategy which is incompatible with the type-hinted return OldSound\RabbitMqBundle\...ceiverStrategyInterface.
Loading history...
122
        }
123
        return new SingleExecuteReceiverStrategy();
0 ignored issues
show
Bug Best Practice introduced by
The expression return new OldSound\Rabb...ecuteReceiverStrategy() returns the type OldSound\RabbitMqBundle\...ExecuteReceiverStrategy which is incompatible with the type-hinted return OldSound\RabbitMqBundle\...ceiverStrategyInterface.
Loading history...
124
    }
125
126
    private function createReceiverExecutorByOptions(ConsumeOptions $consumeOptions): ReceiverExecutorInterface
127 32
    {
128
        if ($consumeOptions instanceof BatchConsumeOptions) {
129 32
            $receiverExecutor = new BatchReceiverExecutor();
130 32
        } else if ($consumeOptions instanceof RpcConsumeOptions) {
131
            $receiverExecutor = new ReplyReceiverExecutor($consumeOptions);
132
        } else {
133 32
            $receiverExecutor = new SingleReceiverExecutor();
134 32
        }
135 32
136 32
        return $receiverExecutor;
137 32
    }
138
139 32
    public function consumeQueue(ConsumeOptions $consumerOptions, $receiver): Consumer
140
    {
141 32
        $executeReceiverStrategy = $this->createStrategyByOptions($consumerOptions);
142 32
        $receiverExecutor = $this->createReceiverExecutorByOptions($consumerOptions);
143 32
        if (!$receiverExecutor->support($receiver)) {
144
            throw new \InvalidArgumentException('sdfs');
145
        }
146
147
        $consuming = new Consuming($consumerOptions, $executeReceiverStrategy, $receiverExecutor, $receiver);
148
        $executeReceiverStrategy->setReceiverExecutor(
149
            new ReceiverExecutorDecorator($consuming, $this->logger)
150
        );
151
152
        $this->consumings[] = $consuming;
153
154
        return $this;
155
    }
156
157
    /**
158
     * @return ConsumeOptions[]
159
     */
160
    public function getConsumings(): array
161
    {
162
        return $this->consumings;
0 ignored issues
show
Bug Best Practice introduced by
The expression return $this->consumings returns the type OldSound\RabbitMqBundle\RabbitMq\Consuming[] which is incompatible with the documented return type OldSound\RabbitMqBundle\...ations\ConsumeOptions[].
Loading history...
163
    }
164
165
    /**
166
     * Consume the message
167
     * @param   int     $msgAmount
168
     * @return  int
169
     *
170
     * @throws  AMQPTimeoutException
171
     */
172
    public function startConsume(int $msgAmount = null)
173
    {
174
        $this->target = $msgAmount;
175 32
        $this->consumed = 0;
176
177 14
        $this->setup();
178
179 14
        $this->lastActivityDateTime = new \DateTime();
180 14
        while ($this->channel->is_consuming()) {
181
            $this->dispatchEvent(OnConsumeEvent::NAME, new OnConsumeEvent($this));
182 32
            $this->maybeStopConsumer();
183
184 32
            if ($this->forceStop) {
185
                break;
186 10
            }
187 22
            /*
188
             * Be careful not to trigger ::wait() with 0 or less seconds, when
189
             * graceful max execution timeout is being used.
190 22
             */
191
            $waitTimeout = $this->chooseWaitTimeout();
192 5
            if ($this->gracefulMaxExecutionDateTime && $waitTimeout < 1) {
193 17
                return $this->gracefulMaxExecutionTimeoutExitCode;
194
            }
195 15
196
            try {
197
                $this->channel->wait(null, false, $waitTimeout);
198 32
                $this->lastActivityDateTime = new \DateTime();
199 32
                if ($this->forceStop) {
200
                    break;
201 32
                }
202
            } catch (AMQPTimeoutException $e) {
203
                foreach($this->executeReceiverStrategies as $executeReceiverStrategy) {
0 ignored issues
show
Bug Best Practice introduced by
The property executeReceiverStrategies does not exist on OldSound\RabbitMqBundle\RabbitMq\Consumer. Did you maybe forget to declare it?
Loading history...
204 32
                    $executeReceiverStrategy->onCatchTimeout($e);
205
                }
206
                $now = new \DateTime();
207
                if ($this->gracefulMaxExecutionDateTime && $this->gracefulMaxExecutionDateTime <= $now) {
208
                    return $this->gracefulMaxExecutionTimeoutExitCode;
209
                }
210
211
                if ($this->idleTimeout && ($this->lastActivityDateTime->getTimestamp() + $this->idleTimeout <= $now->getTimestamp())) {
212
                    $idleEvent = new OnIdleEvent($this);
213
                    $this->dispatchEvent(OnIdleEvent::NAME, $idleEvent);
214
215
                    if ($idleEvent->isForceStop()) {
216
                        if (null !== $this->idleTimeoutExitCode) {
217
                            return $this->idleTimeoutExitCode;
218
                        } else {
219
                            throw $e;
220
                        }
221 4
                    }
222
                }
223 4
            }
224 4
        }
225
226
        return 0;
227
    }
228
229 4
    private function handleProcessMessages(array $flags, Consuming $consuming)
0 ignored issues
show
Unused Code introduced by
The parameter $consuming is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

229
    private function handleProcessMessages(array $flags, /** @scrutinizer ignore-unused */ Consuming $consuming)

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
230
    {
231 4
        $ack = !array_search(fn ($reply) => $reply !== ReceiverInterface::MSG_ACK, $flags, true);
232 4
        if ($this->multiAck && count($flags) > 1 && $ack) {
233
            $lastDeliveryTag = array_key_last($flags);
234
235
            $this->channel->basic_ack($lastDeliveryTag, true);
236
            $this->consumed = $this->consumed + count($flags);
237 2
        } else {
238
            foreach ($flags as $deliveryTag => $flag) {
239 2
                if ($flag === ReceiverInterface::MSG_REJECT_REQUEUE) {
240 2
                    $this->channel->basic_reject($deliveryTag, true); // Reject and requeue message to RabbitMQ
241
                } else if ($flag === ReceiverInterface::MSG_SINGLE_NACK_REQUEUE) {
242
                    $this->channel->basic_nack($deliveryTag, false, true); // NACK and requeue message to RabbitMQ
243
                } else if ($flag === ReceiverInterface::MSG_REJECT) {
244
                    $this->channel->basic_reject($deliveryTag, false); // Reject and drop
245
                } else if ($flag !== ReceiverInterface::MSG_ACK_SENT) {
246
                    $this->channel->basic_ack($deliveryTag); // Remove message from queue only if callback return not false
247
                } else {
248
                    // TODO throw..
249
                }
250
251
                $this->consumed++;
252
            }
253
        }
254
    }
255
256
    protected function maybeStopConsumer()
257
    {
258
        if ($this->forceStop || ($this->target && $this->consumed == $this->target)) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $this->target of type integer|null is loosely compared to true; this is ambiguous if the integer can be 0. You might want to explicitly use !== null instead.

In PHP, under loose comparison (like ==, or !=, or switch conditions), values of different types might be equal.

For integer values, zero is a special case, in particular the following results might be unexpected:

0   == false // true
0   == null  // true
123 == false // false
123 == null  // false

// It is often better to use strict comparison
0 === false // false
0 === null  // false
Loading history...
259
            $this->stopConsuming();
260
        }
261
    }
262
263
    public function forceStopConsumer()
264
    {
265
        $this->forceStop = true;
266
    }
267 10
268
    public function stopConsuming($immediately = false)
269 10
    {
270 4
        if (false === $immediately) {
271 4
            foreach ($this->executeReceiverStrategies as $executeReceiverStrategy) {
0 ignored issues
show
Bug Best Practice introduced by
The property executeReceiverStrategies does not exist on OldSound\RabbitMqBundle\RabbitMq\Consumer. Did you maybe forget to declare it?
Loading history...
272 4
                $executeReceiverStrategy->onStopConsuming();
273 4
            }
274 4
        }
275
276 4
        foreach ($this->consumerTags as $consumerTag) {
277 2
            $this->channel->basic_cancel($consumerTag, false, true);
278
        }
279
280
        $this->consumerTags = [];
281
    }
282
283
    /**
284
     * @param int $secondsInTheFuture
285 4
     */
286 4
    public function setGracefulMaxExecutionDateTimeFromSecondsInTheFuture($secondsInTheFuture)
287
    {
288
        $this->gracefulMaxExecutionDateTime = new \DateTime("+{$secondsInTheFuture} seconds");
289
    }
290
291
    /**
292
     * Choose the timeout wait (in seconds) to use for the $this->getChannel()->wait() method.
293
     */
294
    private function chooseWaitTimeout(): int
295 4
    {
296 4
        if ($this->gracefulMaxExecutionDateTime) {
297
            $allowedExecutionSeconds = $this->gracefulMaxExecutionDateTime->getTimestamp() - time();
298
299
            /*
300
             * Respect the idle timeout if it's set and if it's less than
301 6
             * the remaining allowed execution.
302 6
             */
303
            $waitTimeout = $this->idleTimeout && $this->idleTimeout < $allowedExecutionSeconds
304
                ? $this->idleTimeout
305
                : $allowedExecutionSeconds;
306
        } else {
307
            $waitTimeout = $this->idleTimeout;
308
        }
309
310
        if (!is_null($this->timeoutWait) && $this->timeoutWait > 0) {
311
            $waitTimeout = min($waitTimeout, $this->timeoutWait);
312
        }
313
        return $waitTimeout;
314
    }
315
}
316