Test Failed
Pull Request — master (#39)
by Aleksandr
05:36
created

Consumer::processMessages()   C

Complexity

Conditions 15
Paths 80

Size

Total Lines 58
Code Lines 39

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 14
CRAP Score 21.075

Importance

Changes 2
Bugs 0 Features 0
Metric Value
cc 15
eloc 39
c 2
b 0
f 0
nc 80
nop 2
dl 0
loc 58
ccs 14
cts 20
cp 0.7
crap 21.075
rs 5.9166

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
namespace OldSound\RabbitMqBundle\RabbitMq;
4
5
use http\Exception\InvalidArgumentException;
6
use OldSound\RabbitMqBundle\Declarations\BatchConsumeOptions;
7
use OldSound\RabbitMqBundle\Declarations\ConsumeOptions;
8
use OldSound\RabbitMqBundle\Declarations\RpcConsumeOptions;
9
use OldSound\RabbitMqBundle\Event\AfterProcessingMessageEvent;
0 ignored issues
show
Bug introduced by
The type OldSound\RabbitMqBundle\...rProcessingMessageEvent was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
10
use OldSound\RabbitMqBundle\Event\AfterProcessingMessagesEvent;
11
use OldSound\RabbitMqBundle\Event\AMQPEvent;
0 ignored issues
show
Bug introduced by
The type OldSound\RabbitMqBundle\Event\AMQPEvent was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
12
use OldSound\RabbitMqBundle\Event\BeforeProcessingMessageEvent;
0 ignored issues
show
Bug introduced by
The type OldSound\RabbitMqBundle\...eProcessingMessageEvent was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
13
use OldSound\RabbitMqBundle\Event\BeforeProcessingMessagesEvent;
14
use OldSound\RabbitMqBundle\Event\OnConsumeEvent;
15
use OldSound\RabbitMqBundle\Event\OnIdleEvent;
16
use OldSound\RabbitMqBundle\EventDispatcherAwareTrait;
17
use OldSound\RabbitMqBundle\ExecuteReceiverStrategy\BatchExecuteReceiverStrategy;
18
use OldSound\RabbitMqBundle\ExecuteReceiverStrategy\ExecuteReceiverStrategyInterface;
19
use OldSound\RabbitMqBundle\ExecuteReceiverStrategy\CallbackReceiverExecutorInterface;
0 ignored issues
show
Bug introduced by
The type OldSound\RabbitMqBundle\...ceiverExecutorInterface was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
20
use OldSound\RabbitMqBundle\ExecuteReceiverStrategy\MessagesProcessorInterface;
0 ignored issues
show
Bug introduced by
The type OldSound\RabbitMqBundle\...sagesProcessorInterface was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
21
use OldSound\RabbitMqBundle\ExecuteReceiverStrategy\SingleExecuteReceiverStrategy;
22
use OldSound\RabbitMqBundle\Logger\ExtraContextLogger;
23
use OldSound\RabbitMqBundle\ReceiverExecutor\BatchReceiverExecutor;
24
use OldSound\RabbitMqBundle\ReceiverExecutor\ReceiverExecutorDecorator;
25
use OldSound\RabbitMqBundle\ReceiverExecutor\ReceiverExecutorInterface;
26
use OldSound\RabbitMqBundle\ReceiverExecutor\ReplyReceiverExecutor;
27
use OldSound\RabbitMqBundle\ReceiverExecutor\SingleReceiverExecutor;
28
use OldSound\RabbitMqBundle\Producer\ProducerInterface;
29
use OldSound\RabbitMqBundle\Receiver\NotReadyReceiveException;
30
use OldSound\RabbitMqBundle\Receiver\ReceiverException;
31
use OldSound\RabbitMqBundle\Receiver\ReceiverInterface;
32
use OldSound\RabbitMqBundle\Receiver\ReplyReceiverInterface;
33
use PhpAmqpLib\Channel\AMQPChannel;
34
use PhpAmqpLib\Exception\AMQPRuntimeException;
35
use PhpAmqpLib\Exception\AMQPTimeoutException;
36
use PhpAmqpLib\Message\AMQPMessage;
37
use Psr\Log\LoggerAwareTrait;
38
use Psr\Log\NullLogger;
39
use Symfony\Component\Serializer\SerializerInterface;
40
41
class Consumer
42
{
43
    use LoggerAwareTrait;
44
    use EventDispatcherAwareTrait;
45
46
    /** @var AMQPChannel */
47
    protected $channel;
48
    /** @var Consuming[] */
49
    protected $consumings = [];
50 32
    /** @var string[] */
51
    protected $consumerTags = [];
52 32
    /** @var int|null */
53
    protected $target;
54
    /** @var int */
55
    protected $consumed = 0;
56
    /** @var bool */
57
    protected $forceStop = false;
58
    /**
59
     * Importrant! If true - then channel can not be used from somewhere else
60
     * @var bool
61
     */
62
    public $multiAck = false;
63
    /**
64 12
     * @var \DateTime|null DateTime after which the consumer will gracefully exit. "Gracefully" means, that
65
     *      any currently running consumption will not be interrupted.
66 12
     */
67
    public $gracefulMaxExecutionDateTime;
68 12
    /** @var int Exit code used, when consumer is closed by the Graceful Max Execution Timeout feature. */
69
    public $gracefulMaxExecutionTimeoutExitCode = 0;
70 12
    /** @var int|null */
71 10
    public $timeoutWait;
72 10
    /** @var int */
73
    public $idleTimeout = 0;
74
    /** @var int */
75
    public $idleTimeoutExitCode;
76
    /** @var \DateTime|null */
77
    public $lastActivityDateTime;
78 10
79
    public function __construct(AMQPChannel $channel)
80 10
    {
81 10
        $this->channel = $channel;
82
        $this->logger = new NullLogger();
83 2
    }
84
85
    public function getChannel(): AMQPChannel
86 8
    {
87
        return $this->channel;
88 8
    }
89 6
90 6
    protected function setup(): Consumer
91 2
    {
92
        foreach($this->consumings as $index => $consuming) {
93
            $this->channel->basic_qos($consuming->options->qosPrefetchSize, $consuming->options->qosPrefetchCount, false);
94 4
95 4
            $options = $consuming->options;
96
            $consuming->consumerTag ? $consuming->consumerTag : sprintf("PHPPROCESS_%s_%s_%s", gethostname(), getmypid(), $index);
97 4
            $consumerTag = $this->channel->basic_consume(
98 4
                $options->queue,
99 2
                $consuming->consumerTag,
100
                $options->noLocal,
101 2
                $options->noAck,
102
                $options->exclusive,
103
                false,
104
                function (AMQPMessage $message) use ($consuming) {
105
                    $consuming->executeReceiverStrategy->onConsumeCallback($message);
106
                    $this->maybeStopConsumer();
107
                });
108 4
109
            $consuming->consumerTag = $consumerTag;
110
        }
111
112
        return $this;
113
    }
114
115
    /**
116
     * @param iterable|ConsumeOptions[] $queueConsumings
117
     */
118
    public function consumeQueues(iterable $queueConsumings)
119
    {
120
        foreach ($queueConsumings as $queueConsuming) {
121
            $this->consumeQueue($queueConsuming);
0 ignored issues
show
Bug introduced by
The call to OldSound\RabbitMqBundle\...onsumer::consumeQueue() has too few arguments starting with receiver. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

121
            $this->/** @scrutinizer ignore-call */ 
122
                   consumeQueue($queueConsuming);

This check compares calls to functions or methods with their respective definitions. If the call has less arguments than are defined, it raises an issue.

If a function is defined several times with a different number of parameters, the check may pick up the wrong definition and report false positives. One codebase where this has been known to happen is Wordpress. Please note the @ignore annotation hint above.

Loading history...
122
        }
123
    }
124
125
    private function createStrategyByOptions(ConsumeOptions $consumeOptions): ExecuteReceiverStrategyInterface
126
    {
127 32
        if ($consumeOptions instanceof BatchConsumeOptions) {
128
            return new BatchExecuteReceiverStrategy($consumeOptions->batchCount);
129 32
        }
130 32
        return new SingleExecuteReceiverStrategy();
131
    }
132
133 32
    private function createReceiverExecutorByOptions(ConsumeOptions $consumeOptions): ReceiverExecutorInterface
134 32
    {
135 32
        if ($consumeOptions instanceof BatchConsumeOptions) {
136 32
            $receiverExecutor = new BatchReceiverExecutor();
137 32
        } else if ($consumeOptions instanceof RpcConsumeOptions) {
138
            $receiverExecutor = new ReplyReceiverExecutor($consumeOptions);
139 32
        } else {
140
            $receiverExecutor = new SingleReceiverExecutor();
141 32
        }
142 32
143 32
        return $receiverExecutor;
144
    }
145
146
    public function consumeQueue(ConsumeOptions $consumerOptions, $receiver): Consumer
147
    {
148
        $executeReceiverStrategy = $this->createStrategyByOptions($consumerOptions);
149
        $receiverExecutor = $this->createReceiverExecutorByOptions($consumerOptions);
150
        if (!$receiverExecutor->support($receiver)) {
151
            throw new \InvalidArgumentException('sdfs');
152
        }
153
154
        $consuming = new Consuming($consumerOptions, $executeReceiverStrategy, $receiverExecutor, $receiver);
155
        $executeReceiverStrategy->setMessagesProcessor(
156
            new ReceiverExecutorDecorator($consuming, $this->logger)
157
        );
158
159
        $this->consumings[] = $consuming;
160
161
        return $this;
162
    }
163
164
    /**
165
     * @return ConsumeOptions[]
166
     */
167
    public function getConsumings(): array
168
    {
169
        return $this->consumings;
0 ignored issues
show
Bug Best Practice introduced by
The expression return $this->consumings returns the type OldSound\RabbitMqBundle\RabbitMq\Consuming[] which is incompatible with the documented return type OldSound\RabbitMqBundle\...ations\ConsumeOptions[].
Loading history...
170
    }
171
172
    /**
173
     * Consume the message
174
     * @param   int     $msgAmount
175 32
     * @return  int
176
     *
177 14
     * @throws  AMQPTimeoutException
178
     */
179 14
    public function startConsume(int $msgAmount = null)
180 14
    {
181
        $this->target = $msgAmount;
182 32
        $this->consumed = 0;
183
184 32
        $this->setup();
185
186 10
        $this->lastActivityDateTime = new \DateTime();
187 22
        while ($this->channel->is_consuming()) {
188
            $this->dispatchEvent(OnConsumeEvent::NAME, new OnConsumeEvent($this));
189
            $this->maybeStopConsumer();
190 22
191
            if ($this->forceStop) {
192 5
                break;
193 17
            }
194
            /*
195 15
             * Be careful not to trigger ::wait() with 0 or less seconds, when
196
             * graceful max execution timeout is being used.
197
             */
198 32
            $waitTimeout = $this->chooseWaitTimeout();
199 32
            if ($this->gracefulMaxExecutionDateTime && $waitTimeout < 1) {
200
                return $this->gracefulMaxExecutionTimeoutExitCode;
201 32
            }
202
203
            try {
204 32
                $this->channel->wait(null, false, $waitTimeout);
205
                $this->lastActivityDateTime = new \DateTime();
206
                if ($this->forceStop) {
207
                    break;
208
                }
209
            } catch (AMQPTimeoutException $e) {
210
                foreach($this->executeReceiverStrategies as $executeReceiverStrategy) {
0 ignored issues
show
Bug Best Practice introduced by
The property executeReceiverStrategies does not exist on OldSound\RabbitMqBundle\RabbitMq\Consumer. Did you maybe forget to declare it?
Loading history...
211
                    $executeReceiverStrategy->onCatchTimeout($e);
212
                }
213
                $now = new \DateTime();
214
                if ($this->gracefulMaxExecutionDateTime && $this->gracefulMaxExecutionDateTime <= $now) {
215
                    return $this->gracefulMaxExecutionTimeoutExitCode;
216
                }
217
218
                if ($this->idleTimeout && ($this->lastActivityDateTime->getTimestamp() + $this->idleTimeout <= $now->getTimestamp())) {
219
                    $idleEvent = new OnIdleEvent($this);
220
                    $this->dispatchEvent(OnIdleEvent::NAME, $idleEvent);
221 4
222
                    if ($idleEvent->isForceStop()) {
223 4
                        if (null !== $this->idleTimeoutExitCode) {
224 4
                            return $this->idleTimeoutExitCode;
225
                        } else {
226
                            throw $e;
227
                        }
228
                    }
229 4
                }
230
            }
231 4
        }
232 4
233
        return 0;
234
    }
235
236
    /**
237 2
     * @param AMQPMessage[] $messages
238
     * @param ConsumeOptions $queueConsuming
239 2
     */
240 2
    protected function processMessages(array $messages, ConsumeOptions $queueConsuming)
241
    {
242
        if (count($messages) === 0) {
243
            throw new \InvalidArgumentException('Messages can not be empty');
244
        }
245
246
        $canPrecessMultiMessages = $this->getExecuteReceiverStrategy($queueConsuming)->canPrecessMultiMessages();
0 ignored issues
show
Bug introduced by
The method getExecuteReceiverStrategy() does not exist on OldSound\RabbitMqBundle\RabbitMq\Consumer. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

246
        $canPrecessMultiMessages = $this->/** @scrutinizer ignore-call */ getExecuteReceiverStrategy($queueConsuming)->canPrecessMultiMessages();

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
247
        if (!$canPrecessMultiMessages && count($messages) !== 1) {
248
            throw new \InvalidArgumentException('Strategy is not supported process of multi messages');
249
        }
250
251
        /** @var int[]|int $flags */
252
        $flags = [];
253
        try {
254
            if ($queueConsuming->receiver instanceof ReceiverInterface) {
255
                $flags = $queueConsuming->receiver->execute($messages[0]);
256
            } else if ($queueConsuming->receiver instanceof BatchReceiverInterface) {
0 ignored issues
show
Bug introduced by
The type OldSound\RabbitMqBundle\...\BatchReceiverInterface was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
257
                $flags = $queueConsuming->receiver->batchExecute($messages);
258
            } else if ($queueConsuming->receiver instanceof ReplyReceiverInterface) {
259
                $reply = $queueConsuming->receiver->execute($messages[0]);
260
                $isRpcCall = $messages[0]->has('reply_to') && $messages[0]->has('correlation_id');
261
                if ($isRpcCall) {
262
                    $this->sendRpcReply($messages[0], $reply);
0 ignored issues
show
Bug introduced by
The method sendRpcReply() does not exist on OldSound\RabbitMqBundle\RabbitMq\Consumer. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

262
                    $this->/** @scrutinizer ignore-call */ 
263
                           sendRpcReply($messages[0], $reply);

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
263
                    $flags = ReceiverInterface::MSG_ACK;
264
                } else {
265
                    $flags = ReceiverInterface::MSG_REJECT;
266
                    // logging
267 10
                }
268
            } else {
269 10
                throw new \InvalidArgumentException('TODO');
270 4
            }
271 4
        } catch (ReceiverException $exception) {
272 4
            $flags = $exception->getCode();
273 4
        } catch (NotReadyReceiveException $exception) {
274 4
            // TODO
275
            $this->forceStop = true;
276 4
            return;
277 2
        }
278
279
        if (!is_array($flags)) { // spread handle flag for each delivery tag
280
            $flag = $flags;
281
            $flags = [];
282
            foreach ($messages as $message) {
283
                $flags[$message->getDeliveryTag()] = $flag;
284
            }
285 4
        } else if (count($flags) !== count($messages)) {
286 4
            throw new AMQPRuntimeException(
287
                'Method batchExecute() should return an array with elements equal with the number of messages processed'
288
            );
289
        }
290
291
        if (!$queueConsuming->noAck) {
292
            $messages = array_combine(
293
                array_map(fn ($message) => $message->getDeliveryTag(), $messages),
294
                $messages
295 4
            );
296 4
297
            $this->handleProcessMessages($messages, $flags, $queueConsuming);
0 ignored issues
show
Bug introduced by
It seems like $messages can also be of type false; however, parameter $messages of OldSound\RabbitMqBundle\...handleProcessMessages() does only seem to accept PhpAmqpLib\Message\AMQPMessage[], maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

297
            $this->handleProcessMessages(/** @scrutinizer ignore-type */ $messages, $flags, $queueConsuming);
Loading history...
298
        }
299
    }
300
301 6
    /**
302 6
     * @param AMQPMessage[] $messages
303
     * @param int[]|RpcReponse[]|RpcResponseException[]|bool[] $replies
304
     */
305
    private function handleProcessMessages($messages, array $replies, ConsumeOptions $queueConsuming)
306
    {
307
        $executeReceiverStrategy = $this->getExecuteReceiverStrategy($queueConsuming);
308
309
        $ack = !array_search(fn ($reply) => $reply !== ReceiverInterface::MSG_ACK, $replies, true);
310
        if ($this->multiAck && count($messages) > 1 && $ack) {
311
            $channels = array_map(fn ($message) => $message->getChannel(), $messages);
312
            if (count($channels) !== array_unique($channels)) { // all messages have same channel
313
                throw new InvalidArgumentException('Messages can not be processed as multi ack with different channels');
314
            }
315
316
            $lastDeliveryTag = array_key_last($replies);
317
318
            $this->channel->basic_ack($lastDeliveryTag, true);
319
            $this->consumed = $this->consumed + count($messages);
320
            foreach ($messages as $message) {
321
                $executeReceiverStrategy->onMessageProcessed($message);
322
            }
323
        } else {
324
            foreach ($replies as $deliveryTag => $reply) {
325
                $message = $messages[$deliveryTag] ?? null;
326
                if (null === $message) {
327
                    throw new AMQPRuntimeException(sprintf('Unknown delivery_tag %d!', $deliveryTag));
328
                }
329
330
                $channel = $message->getChannel();
331
                $processFlag = $reply;
332
                if ($processFlag === ReceiverInterface::MSG_REJECT_REQUEUE || false === $processFlag) {
333
                    $channel->basic_reject($deliveryTag, true); // Reject and requeue message to RabbitMQ
334
                } else if ($processFlag === ReceiverInterface::MSG_SINGLE_NACK_REQUEUE) {
335
                    $channel->basic_nack($deliveryTag, false, true); // NACK and requeue message to RabbitMQ
336
                } else if ($processFlag === ReceiverInterface::MSG_REJECT) {
337
                    $channel->basic_reject($deliveryTag, false); // Reject and drop
338
                } else if ($processFlag !== ReceiverInterface::MSG_ACK_SENT) {
339
                    $channel->basic_ack($deliveryTag); // Remove message from queue only if callback return not false
340
                }
341
342
                $this->consumed++;
343
344
                $executeReceiverStrategy->onMessageProcessed($message);
345
            }
346
        }
347
    }
348
349
    protected function maybeStopConsumer()
350
    {
351
        if ($this->forceStop || ($this->target && $this->consumed == $this->target)) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $this->target of type integer|null is loosely compared to true; this is ambiguous if the integer can be 0. You might want to explicitly use !== null instead.

In PHP, under loose comparison (like ==, or !=, or switch conditions), values of different types might be equal.

For integer values, zero is a special case, in particular the following results might be unexpected:

0   == false // true
0   == null  // true
123 == false // false
123 == null  // false

// It is often better to use strict comparison
0 === false // false
0 === null  // false
Loading history...
352
            $this->stopConsuming();
353
        }
354
    }
355
356
    public function forceStopConsumer()
357
    {
358
        $this->forceStop = true;
359
    }
360
361
    public function stopConsuming($immediately = false)
362
    {
363
        if (false === $immediately) {
364
            foreach ($this->executeReceiverStrategies as $executeReceiverStrategy) {
0 ignored issues
show
Bug Best Practice introduced by
The property executeReceiverStrategies does not exist on OldSound\RabbitMqBundle\RabbitMq\Consumer. Did you maybe forget to declare it?
Loading history...
365
                $executeReceiverStrategy->onStopConsuming();
366
            }
367
        }
368
369
        foreach ($this->consumerTags as $consumerTag) {
370
            $this->channel->basic_cancel($consumerTag, false, true);
371
        }
372
373
        $this->consumerTags = [];
374
    }
375
376
    /**
377
     * @param int $secondsInTheFuture
378
     */
379
    public function setGracefulMaxExecutionDateTimeFromSecondsInTheFuture($secondsInTheFuture)
380
    {
381
        $this->gracefulMaxExecutionDateTime = new \DateTime("+{$secondsInTheFuture} seconds");
382
    }
383
384
    /**
385
     * Choose the timeout wait (in seconds) to use for the $this->getChannel()->wait() method.
386
     */
387
    private function chooseWaitTimeout(): int
388
    {
389
        if ($this->gracefulMaxExecutionDateTime) {
390
            $allowedExecutionSeconds = $this->gracefulMaxExecutionDateTime->getTimestamp() - time();
391
392
            /*
393
             * Respect the idle timeout if it's set and if it's less than
394
             * the remaining allowed execution.
395
             */
396
            $waitTimeout = $this->idleTimeout && $this->idleTimeout < $allowedExecutionSeconds
397
                ? $this->idleTimeout
398
                : $allowedExecutionSeconds;
399
        } else {
400
            $waitTimeout = $this->idleTimeout;
401
        }
402
403
        if (!is_null($this->timeoutWait) && $this->timeoutWait > 0) {
404
            $waitTimeout = min($waitTimeout, $this->timeoutWait);
405
        }
406
        return $waitTimeout;
407
    }
408
}
409