QueueEntity   B
last analyzed

Complexity

Total Complexity 50

Size/Duplication

Total Lines 500
Duplicated Lines 0 %

Test Coverage

Coverage 46.02%

Importance

Changes 7
Bugs 0 Features 1
Metric Value
eloc 184
c 7
b 0
f 1
dl 0
loc 500
ccs 81
cts 176
cp 0.4602
rs 8.4
wmc 50

24 Methods

Rating   Name   Duplication   Size   Complexity  
A getConnection() 0 3 1
A delete() 0 3 1
B bind() 0 19 7
A reconnect() 0 3 1
A setPrefetchCount() 0 4 1
A __construct() 0 5 1
A getChannel() 0 3 1
A create() 0 23 4
A getAliasName() 0 3 1
A setGlobalPrefetch() 0 5 1
A setMessageProcessor() 0 4 1
A createQueue() 0 6 1
A getConsumerTag() 0 3 1
A handleKillSignals() 0 9 3
A stopConsuming() 0 6 2
A shouldStopConsuming() 0 31 4
A startConsuming() 0 26 5
A getMessageProcessor() 0 9 3
A catchKillSignal() 0 4 1
A registerShutdownHandler() 0 5 1
A setupChannelConsumer() 0 21 2
A consume() 0 18 2
A setupConsumer() 0 12 1
A publish() 0 32 4

How to fix   Complexity   

Complex Class

Complex classes like QueueEntity often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use QueueEntity, and based on these observations, apply Extract Interface, too.

1
<?php
2
3
namespace NeedleProject\LaravelRabbitMq\Entity;
4
5
use NeedleProject\LaravelRabbitMq\AMQPConnection;
6
use NeedleProject\LaravelRabbitMq\ConsumerInterface;
7
use NeedleProject\LaravelRabbitMq\Interpreter\EntityArgumentsInterpreter;
8
use NeedleProject\LaravelRabbitMq\Processor\AbstractMessageProcessor;
9
use NeedleProject\LaravelRabbitMq\Processor\MessageProcessorInterface;
10
use NeedleProject\LaravelRabbitMq\PublisherInterface;
11
use PhpAmqpLib\Channel\AMQPChannel;
12
use PhpAmqpLib\Exception\AMQPProtocolChannelException;
13
use PhpAmqpLib\Exception\AMQPTimeoutException;
14
use PhpAmqpLib\Message\AMQPMessage;
15
use PhpAmqpLib\Wire\AMQPTable;
16
use Psr\Log\LoggerAwareInterface;
17
use Psr\Log\LoggerAwareTrait;
18
use PhpAmqpLib\Exception\AMQPChannelClosedException;
19
20
/**
21
 * Class QueueEntity
22
 *
23
 * @package NeedleProject\LaravelRabbitMq\Entity
24
 * @author  Adrian Tilita <[email protected]>
25
 */
26
class QueueEntity implements PublisherInterface, ConsumerInterface, AMQPEntityInterface, LoggerAwareInterface
27
{
28
    use LoggerAwareTrait;
29
30
    /**
31
     * @const int   Retry count when a Channel Closed exeption is thrown
32
     */
33
    const MAX_RETRIES = 3;
34
35
    /**
36
     * @const array Default connections parameters
37
     */
38
    const DEFAULTS = [
39
        // Whether to check if it exists or to verify existance using argument types (Throws PRECONDITION_FAILED)
40
        'passive'                      => false,
41
        // Entities with durable will be re-created uppon server restart
42
        'durable'                      => false,
43
        // whether to use it by only one channel, then it gets deleted
44
        'exclusive'                    => false,
45
        // Whether to delete it when the queue has no event on it
46
        'auto_delete'                  => false,
47
        // Whether to receive a Declare confirmation
48
        'nowait'                       => false,
49
        // Additional arguments for queue creation
50
        'arguments'                    => [],
51
        // Whether to auto create the entity before publishing/consuming it
52
        'auto_create'                  => false,
53
        // whether to "hide" the exception on re-declare.
54
        // if the `passive` filter is set true, this is redundant
55
        'throw_exception_on_redeclare' => true,
56
        // whether to throw on exception when trying to
57
        // bind to an in-existent queue/exchange
58
        'throw_exception_on_bind_fail' => true,
59
        // no ideea what it represents - @todo - find a documentation that states it's role
60
        'ticket'                       => null
61
    ];
62
63
    /**
64
     * @var AMQPConnection
65
     */
66
    protected $connection;
67
68
    /**
69
     * @var string
70
     */
71
    protected $aliasName;
72
73
    /**
74
     * @var array
75
     */
76
    protected $attributes;
77
78
    /**
79
     * @var int
80
     */
81
    protected $prefetchCount = 1;
82
83
    /**
84
     * @var null|string|MessageProcessorInterface
85
     */
86
    protected $messageProcessor = null;
87
88
    /**
89
     * @var int
90
     */
91
    protected $limitMessageCount;
92
93
    /**
94
     * @var int
95
     */
96
    protected $limitSecondsUptime;
97
98
    /**
99
     * @var int
100
     */
101
    protected $limitMemoryConsumption;
102
103
    /**
104
     * @var double
105
     */
106
    protected $startTime = 0;
107
108
    /**
109
     * @var int
110
     */
111
    protected $retryCount = 0;
112
    /**
113
     * @var bool
114 19
     */
115
    protected $globalPrefetch = true;
116 19
117
    /**
118
     * @param AMQPConnection $connection
119 19
     * @param string $aliasName
120
     * @param array $queueDetails
121
     * @return QueueEntity
122
     */
123
    public static function createQueue(AMQPConnection $connection, string $aliasName, array $queueDetails)
124
    {
125
        return new static(
126 3
            $connection,
127
            $aliasName,
128 3
            array_merge(self::DEFAULTS, $queueDetails)
129
        );
130
    }
131
132
    /**
133
     * @return string
134
     */
135
    public function getAliasName(): string
136
    {
137
        return $this->aliasName;
138 19
    }
139
140 19
    /**
141 19
     * ExchangeEntity constructor.
142 19
     *
143 19
     * @param AMQPConnection $connection
144
     * @param string $aliasName
145
     * @param array $attributes
146
     */
147
    public function __construct(AMQPConnection $connection, string $aliasName, array $attributes = [])
148
    {
149 2
        $this->connection = $connection;
150
        $this->aliasName  = $aliasName;
151 2
        $this->attributes = $attributes;
152 2
    }
153
154
    /**
155
     * @param int $prefetchCount
156
     * @return ConsumerInterface
157
     */
158
    public function setPrefetchCount(int $prefetchCount): ConsumerInterface
159 2
    {
160
        $this->prefetchCount = $prefetchCount;
161 2
        return $this;
162 2
    }
163
164
    /**
165
     * @param string $messageProcessor
166
     * @return ConsumerInterface
167
     */
168 10
    public function setMessageProcessor(string $messageProcessor): ConsumerInterface
169
    {
170 10
        $this->messageProcessor = $messageProcessor;
171
        return $this;
172
    }
173
174
    /**
175
     * @param bool $globalPrefetch
176 10
     * @return ConsumerInterface
177
     */
178 10
    public function setGlobalPrefetch(bool $globalPrefetch): ConsumerInterface
179
    {
180
        $this->globalPrefetch = $globalPrefetch;
181
182
        return $this;
183
    }
184 4
185
    /**
186
     * @return AMQPConnection
187 4
     */
188 4
    protected function getConnection(): AMQPConnection
189 4
    {
190 4
        return $this->connection;
191 4
    }
192 4
193 4
    /**
194 4
     * @return AMQPChannel
195 4
     */
196
    protected function getChannel(): AMQPChannel
197 2
    {
198
        return $this->getConnection()->getChannel();
199 2
    }
200 1
201
    /**
202
     * Create the Queue
203 1
     */
204
    public function create()
205 3
    {
206
        try {
207 4
            $this->getChannel()
208
                ->queue_declare(
209 4
                    $this->attributes['name'],
210 1
                    $this->attributes['passive'],
211
                    $this->attributes['durable'],
212 3
                    $this->attributes['exclusive'],
213
                    $this->attributes['auto_delete'],
214 3
                    $this->attributes['nowait'],
215 3
                    EntityArgumentsInterpreter::interpretArguments(
216 3
                        $this->attributes['arguments']
217 3
                    ),
218 3
                    $this->attributes['ticket']
219
                );
220 1
        } catch (AMQPProtocolChannelException $e) {
221
            // 406 is a soft error triggered for precondition failure (when redeclaring with different parameters)
222 1
            if (true === $this->attributes['throw_exception_on_redeclare'] || $e->amqp_reply_code !== 406) {
223 1
                throw $e;
224
            }
225 2
            // a failure trigger channels closing process
226
            $this->reconnect();
227
        }
228 2
    }
229
230
    public function bind()
231
    {
232
        if (!isset($this->attributes['bind']) || empty($this->attributes['bind'])) {
233 1
            return;
234
        }
235 1
        foreach ($this->attributes['bind'] as $bindItem) {
236 1
            try {
237
                $this->getChannel()
238
                    ->queue_bind(
239
                        $this->attributes['name'],
240
                        $bindItem['exchange'],
241 1
                        $bindItem['routing_key'] ?? ''
242
                    );
243 1
            } catch (AMQPProtocolChannelException $e) {
244 1
                // 404 is the code for trying to bind to an non-existing entity
245
                if (true === $this->attributes['throw_exception_on_bind_fail'] || $e->amqp_reply_code !== 404) {
246
                    throw $e;
247
                }
248
                $this->reconnect();
249
            }
250
        }
251
    }
252
253
    /**
254 4
     * Delete the queue
255
     */
256 4
    public function delete()
257 1
    {
258 1
        $this->getChannel()->queue_delete($this->attributes['name']);
259
    }
260
261
    /**
262 4
     * {@inheritdoc}
263 4
     */
264 4
    public function reconnect()
265 4
    {
266 4
        $this->getConnection()->reconnect();
267 4
    }
268
269 3
    /**
270 2
     * Publish a message
271 2
     *
272
     * @param string $message
273 2
     * @param string $routingKey
274 2
     * @param array $properties
275 2
     * @return mixed|void
276 1
     * @throws AMQPProtocolChannelException
277
     */
278 1
    public function publish(string $message, string $routingKey = '', array $properties = [])
279
    {
280 3
        if ($this->attributes['auto_create'] === true) {
281
            $this->create();
282
            $this->bind();
283
        }
284
285
        try {
286
            $this->getChannel()
287
                ->basic_publish(
288
                    new AMQPMessage(
289
                        $message,
290
                        EntityArgumentsInterpreter::interpretProperties(
291
                            $this->attributes,
292
                            $properties
293
                        )
294
                    ),
295
                    '',
296
                    $this->attributes['name'],
297
                    true
298
                );
299
            $this->retryCount = 0;
300
        } catch (AMQPChannelClosedException $exception) {
301
            $this->retryCount++;
302
            // Retry publishing with re-connect
303
            if ($this->retryCount < self::MAX_RETRIES) {
304
                $this->getConnection()->reconnect();
305
                $this->publish($message, $routingKey);
306
307
                return;
308
            }
309
            throw $exception;
310
        }
311
    }
312
313
    /**
314
     * {@inheritdoc}
315
     *
316
     * @param int $messages
317
     * @param int $seconds
318
     * @param int $maxMemory
319
     * @return int
320
     */
321
    public function startConsuming(int $messages, int $seconds, int $maxMemory)
322
    {
323
        $this->setupConsumer($messages, $seconds, $maxMemory);
324
        while (false === $this->shouldStopConsuming()) {
325
            try {
326
                $this->getChannel()->wait(null, false, $seconds);
327
            } catch (AMQPTimeoutException $e) {
328
                if ($this->shouldStopConsuming()) {
329
                    break;
330
                }
331
                usleep(1000);
332
                $this->getConnection()->reconnect();
333
                $this->setupChannelConsumer();
334
            } catch (\Throwable $e) {
335
                // stop the consumer
336
                $this->stopConsuming();
337
                $this->logger->notice(sprintf(
0 ignored issues
show
Bug introduced by
The method notice() does not exist on null. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

337
                $this->logger->/** @scrutinizer ignore-call */ 
338
                               notice(sprintf(

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
338
                    "Stopped consuming: %s in %s:%d",
339
                    get_class($e) . ' - ' . $e->getMessage(),
340
                    (string)$e->getFile(),
341
                    (int)$e->getLine()
342
                ));
343
                return 1;
344
            }
345
        }
346
        return 0;
347
    }
348
349
    /**
350
     * @return bool
351
     */
352
    protected function shouldStopConsuming(): bool
353
    {
354
        if ((microtime(true) - $this->startTime) > $this->limitSecondsUptime) {
355
            $this->logger->debug(
356
                "Stopped consumer",
357
                [
358
                    'limit' => 'time_limit',
359
                    'value' => sprintf("%.2f", microtime(true) - $this->startTime)
360
                ]
361
            );
362
            return true;
363
        }
364
        if (memory_get_peak_usage(true) >= ($this->limitMemoryConsumption * 1048576)) {
365
            $this->logger->debug(
366
                "Stopped consumer",
367
                [
368
                    'limit' => 'memory_limit',
369
                    'value' => (int)round(memory_get_peak_usage(true) / 1048576, 2)
370
                ]
371
            );
372
            return true;
373
        }
374
375
        if ($this->getMessageProcessor()->getProcessedMessages() >= $this->limitMessageCount) {
376
            $this->logger->debug(
377
                "Stopped consumer",
378
                ['limit' => 'message_count', 'value' => (int)$this->getMessageProcessor()->getProcessedMessages()]
379
            );
380
            return true;
381
        }
382
        return false;
383
    }
384
385
    /**
386
     * Stop the consumer
387
     */
388
    public function stopConsuming()
389
    {
390
        try {
391
            $this->getChannel()->basic_cancel($this->getConsumerTag(), false, true);
392
        } catch (\Throwable $e) {
393
            $this->logger->notice("Got " . $e->getMessage() . " of type " . get_class($e));
394
        }
395
    }
396
397
    /**
398
     * Setup the consumer
399
     *
400
     * @param int $messages
401
     * @param int $seconds
402
     * @param int $maxMemory
403
     */
404
    protected function setupConsumer(int $messages, int $seconds, int $maxMemory)
405
    {
406
        $this->limitMessageCount = $messages;
407
        $this->limitSecondsUptime = $seconds;
408
        $this->limitMemoryConsumption = $maxMemory;
409
410
        $this->startTime = microtime(true);
0 ignored issues
show
Documentation Bug introduced by
It seems like microtime(true) can also be of type string. However, the property $startTime is declared as type double. Maybe add an additional type check?

Our type inference engine has found a suspicous assignment of a value to a property. This check raises an issue when a value that can be of a mixed type is assigned to a property that is type hinted more strictly.

For example, imagine you have a variable $accountId that can either hold an Id object or false (if there is no account id yet). Your code now assigns that value to the id property of an instance of the Account class. This class holds a proper account, so the id value must no longer be false.

Either this assignment is in error or a type check should be added for that assignment.

class Id
{
    public $id;

    public function __construct($id)
    {
        $this->id = $id;
    }

}

class Account
{
    /** @var  Id $id */
    public $id;
}

$account_id = false;

if (starsAreRight()) {
    $account_id = new Id(42);
}

$account = new Account();
if ($account instanceof Id)
{
    $account->id = $account_id;
}
Loading history...
411
412
        $this->setupChannelConsumer();
413
414
        $this->registerShutdownHandler();
415
        $this->handleKillSignals();
416
    }
417
418
    private function setupChannelConsumer()
419
    {
420
        if ($this->attributes['auto_create'] === true) {
421
            $this->create();
422
            $this->bind();
423
        }
424
425
        $this->getChannel()
426
             ->basic_qos(null, $this->prefetchCount, $this->globalPrefetch);
427
428
        $this->getChannel()
429
            ->basic_consume(
430
                $this->attributes['name'],
431
                $this->getConsumerTag(),
432
                false,
433
                false,
434
                false,
435
                false,
436
                [
437
                    $this,
438
                    'consume'
439
                ]
440
            );
441
    }
442
443
    /**
444
     * Handle shutdown - Usually in case "Allowed memory size of x bytes exhausted"
445
     */
446
    private function registerShutdownHandler()
447
    {
448
        $consumer = $this;
449
        register_shutdown_function(function () use ($consumer) {
450
            $consumer->stopConsuming();
451
        });
452
    }
453
454
    /**
455
     * Register signals
456
     */
457
    protected function handleKillSignals()
458
    {
459 1
        if (extension_loaded('pcntl')) {
460
            pcntl_signal(SIGTERM, [$this, 'catchKillSignal']);
461 1
            pcntl_signal(SIGINT, [$this, 'catchKillSignal']);
462
463
            if (function_exists('pcntl_signal_dispatch')) {
464
                // let the signal go forward
465
                pcntl_signal_dispatch();
466
            }
467 1
        }
468
    }
469
470
    /**
471
     * Handle Kill Signals
472
     * @param int $signalNumber
473
     */
474 1
    public function catchKillSignal(int $signalNumber)
475
    {
476
        $this->stopConsuming();
477 1
        $this->logger->debug(sprintf("Caught signal %d", $signalNumber));
478 1
    }
479
480
    /**
481
     * It is the tag that is listed in RabbitMQ UI as the consumer "name"
482
     *
483
     * @return string
484
     */
485
    private function getConsumerTag(): string
486
    {
487
        return sprintf("%s_%s_%s", $this->aliasName, gethostname(), getmypid());
488
    }
489
490
    /**
491
     * @return MessageProcessorInterface
492
     */
493 1
    private function getMessageProcessor(): MessageProcessorInterface
494
    {
495
        if (!($this->messageProcessor instanceof MessageProcessorInterface)) {
496
            $this->messageProcessor = app($this->messageProcessor);
0 ignored issues
show
Documentation Bug introduced by
It seems like app($this->messageProcessor) can also be of type Illuminate\Contracts\Foundation\Application or Illuminate\Foundation\Application. However, the property $messageProcessor is declared as type NeedleProject\LaravelRab...orInterface|null|string. Maybe add an additional type check?

Our type inference engine has found a suspicous assignment of a value to a property. This check raises an issue when a value that can be of a mixed type is assigned to a property that is type hinted more strictly.

For example, imagine you have a variable $accountId that can either hold an Id object or false (if there is no account id yet). Your code now assigns that value to the id property of an instance of the Account class. This class holds a proper account, so the id value must no longer be false.

Either this assignment is in error or a type check should be added for that assignment.

class Id
{
    public $id;

    public function __construct($id)
    {
        $this->id = $id;
    }

}

class Account
{
    /** @var  Id $id */
    public $id;
}

$account_id = false;

if (starsAreRight()) {
    $account_id = new Id(42);
}

$account = new Account();
if ($account instanceof Id)
{
    $account->id = $account_id;
}
Loading history...
497
            if ($this->messageProcessor instanceof AbstractMessageProcessor) {
498
                $this->messageProcessor->setLogger($this->logger);
0 ignored issues
show
Bug introduced by
It seems like $this->logger can also be of type null; however, parameter $logger of NeedleProject\LaravelRab...eProcessor::setLogger() does only seem to accept Psr\Log\LoggerInterface, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

498
                $this->messageProcessor->setLogger(/** @scrutinizer ignore-type */ $this->logger);
Loading history...
499
            }
500
        }
501
        return $this->messageProcessor;
502
    }
503
504
    /**
505
     * @param AMQPMessage $message
506
     * @throws \Throwable
507
     */
508
    public function consume(AMQPMessage $message)
509
    {
510
        try {
511
            $this->getMessageProcessor()->consume($message);
512
            $this->logger->debug("Consumed message", ['message' => $message->getBody()]);
513
        } catch (\Throwable $e) {
514
            $this->logger->notice(
515
                sprintf(
516
                    "Got %s from %s in %d",
517
                    $e->getMessage(),
518
                    (string)$e->getFile(),
519
                    (int)$e->getLine()
520
                )
521
            );
522
            // let the exception slide, the processor should handle
523
            // exception, this is just a notice that should not
524
            // ever appear
525
            throw $e;
526
        }
527
    }
528
}
529