Test Failed
Pull Request — master (#32)
by Bogdan
15:55
created

QueueEntity   B

Complexity

Total Complexity 48

Size/Duplication

Total Lines 472
Duplicated Lines 15.04 %

Coupling/Cohesion

Components 1
Dependencies 9

Test Coverage

Coverage 46.02%

Importance

Changes 0
Metric Value
wmc 48
lcom 1
cbo 9
dl 71
loc 472
ccs 81
cts 176
cp 0.4602
rs 8.5599
c 0
b 0
f 0

23 Methods

Rating   Name   Duplication   Size   Complexity  
A createQueue() 0 8 1
A getAliasName() 0 4 1
A __construct() 0 6 1
A setPrefetchCount() 0 5 1
A setMessageProcessor() 0 5 1
A getConnection() 0 4 1
A getChannel() 0 4 1
A create() 22 22 4
B bind() 22 22 7
A delete() 0 4 1
A reconnect() 0 4 1
A publish() 27 27 4
A startConsuming() 0 24 4
A shouldStopConsuming() 0 32 4
A stopConsuming() 0 8 2
A setupConsumer() 0 13 1
A setupChannelConsumer() 0 24 2
A handleKillSignals() 0 12 3
A catchKillSignal() 0 5 1
A getConsumerTag() 0 4 1
A getMessageProcessor() 0 10 3
A consume() 0 20 2
A registerShutdownHandler() 0 7 1

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complex Class

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like QueueEntity often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes. You can also have a look at the cohesion graph to spot any un-connected, or weakly-connected components.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use QueueEntity, and based on these observations, apply Extract Interface, too.

1
<?php
2
namespace NeedleProject\LaravelRabbitMq\Entity;
3
4
use NeedleProject\LaravelRabbitMq\AMQPConnection;
5
use NeedleProject\LaravelRabbitMq\ConsumerInterface;
6
use NeedleProject\LaravelRabbitMq\Processor\AbstractMessageProcessor;
7
use NeedleProject\LaravelRabbitMq\Processor\MessageProcessorInterface;
8
use NeedleProject\LaravelRabbitMq\PublisherInterface;
9
use PhpAmqpLib\Channel\AMQPChannel;
10
use PhpAmqpLib\Exception\AMQPProtocolChannelException;
11
use PhpAmqpLib\Exception\AMQPTimeoutException;
12
use PhpAmqpLib\Message\AMQPMessage;
13
use Psr\Log\LoggerAwareInterface;
14
use Psr\Log\LoggerAwareTrait;
15
use PhpAmqpLib\Exception\AMQPChannelClosedException;
16
17
/**
18
 * Class QueueEntity
19
 *
20
 * @package NeedleProject\LaravelRabbitMq\Entity
21
 * @author  Adrian Tilita <[email protected]>
22
 */
23
class QueueEntity implements PublisherInterface, ConsumerInterface, AMQPEntityInterface, LoggerAwareInterface
24
{
25
    use LoggerAwareTrait;
26
27
    /**
28
     * @const int   Retry count when a Channel Closed exeption is thrown
29
     */
30
    const MAX_RETRIES = 3;
31
32
    /**
33
     * @const array Default connections parameters
34
     */
35
    const DEFAULTS = [
36
        // Whether to check if it exists or to verify existance using argument types (Throws PRECONDITION_FAILED)
37
        'passive'                      => false,
38
        // Entities with durable will be re-created uppon server restart
39
        'durable'                      => false,
40
        // whether to use it by only one channel, then it gets deleted
41
        'exclusive'                    => false,
42
        // Whether to delete it when the queue has no event on it
43
        'auto_delete'                  => false,
44
        // Whether the exchange can be used by a publisher or block it (declared just for internal "wiring")
45
        'internal'                     => false,
46
        // Whether to receive a Declare confirmation
47
        'nowait'                       => false,
48
        // Whether to auto create the entity before publishing/consuming it
49
        'auto_create'                  => false,
50
        // whether to "hide" the exception on re-declare.
51
        // if the `passive` filter is set true, this is redundant
52
        'throw_exception_on_redeclare' => true,
53
        // whether to throw on exception when trying to
54
        // bind to an in-existent queue/exchange
55
        'throw_exception_on_bind_fail' => true,
56
    ];
57
58
    /**
59
     * @var AMQPConnection
60
     */
61
    protected $connection;
62
63
    /**
64
     * @var string
65
     */
66
    protected $aliasName;
67
68
    /**
69
     * @var array
70
     */
71
    protected $attributes;
72
73
    /**
74
     * @var int
75
     */
76
    protected $prefetchCount = 1;
77
78
    /**
79
     * @var null|string|MessageProcessorInterface
80
     */
81
    protected $messageProcessor = null;
82
83
    /**
84
     * @var int
85
     */
86
    protected $limitMessageCount;
87
88
    /**
89
     * @var int
90
     */
91
    protected $limitSecondsUptime;
92
93
    /**
94
     * @var int
95
     */
96
    protected $limitMemoryConsumption;
97
98
    /**
99
     * @var double
100
     */
101
    protected $startTime = 0;
102
103
    /**
104
     * @var int
105
     */
106
    protected $retryCount = 0;
107
108
    /**
109
     * @param AMQPConnection $connection
110
     * @param string $aliasName
111
     * @param array $exchangeDetails
112
     * @return QueueEntity
113
     */
114 19
    public static function createQueue(AMQPConnection $connection, string $aliasName, array $exchangeDetails)
115
    {
116 19
        return new static(
117
            $connection,
118
            $aliasName,
119 19
            array_merge(self::DEFAULTS, $exchangeDetails)
120
        );
121
    }
122
123
    /**
124
     * @return string
125
     */
126 3
    public function getAliasName(): string
127
    {
128 3
        return $this->aliasName;
129
    }
130
131
    /**
132
     * ExchangeEntity constructor.
133
     *
134
     * @param AMQPConnection $connection
135
     * @param string $aliasName
136
     * @param array $attributes
137
     */
138 19
    public function __construct(AMQPConnection $connection, string $aliasName, array $attributes = [])
139
    {
140 19
        $this->connection = $connection;
141 19
        $this->aliasName  = $aliasName;
142 19
        $this->attributes = $attributes;
143 19
    }
144
145
    /**
146
     * @param int $prefetchCount
147
     * @return ConsumerInterface
148
     */
149 2
    public function setPrefetchCount(int $prefetchCount): ConsumerInterface
150
    {
151 2
        $this->prefetchCount = $prefetchCount;
152 2
        return $this;
153
    }
154
155
    /**
156
     * @param string $messageProcessor
157
     * @return ConsumerInterface
158
     */
159 2
    public function setMessageProcessor(string $messageProcessor): ConsumerInterface
160
    {
161 2
        $this->messageProcessor = $messageProcessor;
162 2
        return $this;
163
    }
164
165
    /**
166
     * @return AMQPConnection
167
     */
168 10
    protected function getConnection(): AMQPConnection
169
    {
170 10
        return $this->connection;
171
    }
172
173
    /**
174
     * @return AMQPChannel
175
     */
176 10
    protected function getChannel(): AMQPChannel
177
    {
178 10
        return $this->getConnection()->getChannel();
179
    }
180
181
    /**
182
     * Create the Queue
183
     */
184 4 View Code Duplication
    public function create()
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
185
    {
186
        try {
187 4
            $this->getChannel()
188 4
                ->queue_declare(
189 4
                    $this->attributes['name'],
190 4
                    $this->attributes['passive'],
191 4
                    $this->attributes['durable'],
192 4
                    $this->attributes['exclusive'],
193 4
                    $this->attributes['auto_delete'],
194 4
                    $this->attributes['internal'],
195 4
                    $this->attributes['nowait']
196
                );
197 2
        } catch (AMQPProtocolChannelException $e) {
198
            // 406 is a soft error triggered for precondition failure (when redeclaring with different parameters)
199 2
            if (true === $this->attributes['throw_exception_on_redeclare'] || $e->amqp_reply_code !== 406) {
0 ignored issues
show
Unused Code Bug introduced by
The strict comparison !== seems to always evaluate to true as the types of $e->amqp_reply_code (string) and 406 (integer) can never be identical. Maybe you want to use a loose comparison != instead?
Loading history...
200 1
                throw $e;
201
            }
202
            // a failure trigger channels closing process
203 1
            $this->reconnect();
204
        }
205 3
    }
206
207 4 View Code Duplication
    public function bind()
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
208
    {
209 4
        if (!isset($this->attributes['bind']) || empty($this->attributes['bind'])) {
210 1
            return;
211
        }
212 3
        foreach ($this->attributes['bind'] as $bindItem) {
213
            try {
214 3
                $this->getChannel()
215 3
                    ->queue_bind(
216 3
                        $this->attributes['name'],
217 3
                        $bindItem['exchange'],
218 3
                        $bindItem['routing_key']
219
                    );
220 1
            } catch (AMQPProtocolChannelException $e) {
221
                // 404 is the code for trying to bind to an non-existing entity
222 1
                if (true === $this->attributes['throw_exception_on_bind_fail'] || $e->amqp_reply_code !== 404) {
0 ignored issues
show
Unused Code Bug introduced by
The strict comparison !== seems to always evaluate to true as the types of $e->amqp_reply_code (string) and 404 (integer) can never be identical. Maybe you want to use a loose comparison != instead?
Loading history...
223 1
                    throw $e;
224
                }
225 2
                $this->reconnect();
226
            }
227
        }
228 2
    }
229
230
    /**
231
     * Delete the queue
232
     */
233 1
    public function delete()
234
    {
235 1
        $this->getChannel()->queue_delete($this->attributes['name']);
236 1
    }
237
238
    /**
239
     * {@inheritdoc}
240
     */
241 1
    public function reconnect()
242
    {
243 1
        $this->getConnection()->reconnect();
244 1
    }
245
246
    /**
247
     * Publish a message
248
     *
249
     * @param string $message
250
     * @param string $routingKey
251
     * @return mixed|void
252
     * @throws AMQPProtocolChannelException
253
     */
254 4 View Code Duplication
    public function publish(string $message, string $routingKey = '')
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
255
    {
256 4
        if ($this->attributes['auto_create'] === true) {
257 1
            $this->create();
258 1
            $this->bind();
259
        }
260
261
        try {
262 4
            $this->getChannel()
263 4
                ->basic_publish(
264 4
                    new AMQPMessage($message),
265 4
                    '',
266 4
                    $this->attributes['name'],
267 4
                    true
268
                );
269 3
            $this->retryCount = 0;
270 2
        } catch (AMQPChannelClosedException $exception) {
271 2
            $this->retryCount++;
272
            // Retry publishing with re-connect
273 2
            if ($this->retryCount < self::MAX_RETRIES) {
274 2
                $this->getConnection()->reconnect();
275 2
                $this->publish($message, $routingKey);
276 1
                return;
277
            }
278 1
            throw $exception;
279
        }
280 3
    }
281
282
    /**
283
     * {@inheritdoc}
284
     *
285
     * @param int $messages
286
     * @param int $seconds
287
     * @param int $maxMemory
288
     * @return int
289
     */
290
    public function startConsuming(int $messages, int $seconds, int $maxMemory)
291
    {
292
        $this->setupConsumer($messages, $seconds, $maxMemory);
293
        while (false === $this->shouldStopConsuming()) {
294
            try {
295
                $this->getChannel()->wait(null, false, 1);
296
            } catch (AMQPTimeoutException $e) {
297
                usleep(1000);
298
                $this->getConnection()->reconnect();
299
                $this->setupChannelConsumer();
300
            } catch (\Throwable $e) {
301
                // stop the consumer
302
                $this->stopConsuming();
303
                $this->logger->notice(sprintf(
304
                    "Stopped consuming: %s in %s:%d",
305
                    get_class($e) . ' - ' . $e->getMessage(),
306
                    (string)$e->getFile(),
307
                    (int)$e->getLine()
308
                ));
309
                return 1;
310
            }
311
        }
312
        return 0;
313
    }
314
315
    /**
316
     * @return bool
317
     */
318
    protected function shouldStopConsuming(): bool
319
    {
320
        if ((microtime(true) - $this->startTime) > $this->limitSecondsUptime) {
321
            $this->logger->debug(
322
                "Stopped consumer",
323
                [
324
                    'limit' => 'time_limit',
325
                    'value' => sprintf("%.2f", microtime(true) - $this->startTime)
326
                ]
327
            );
328
            return true;
329
        }
330
        if (memory_get_peak_usage(true) >= ($this->limitMemoryConsumption * 1048576)) {
331
            $this->logger->debug(
332
                "Stopped consumer",
333
                [
334
                    'limit' => 'memory_limit',
335
                    'value' => (int)round(memory_get_peak_usage(true) / 1048576, 2)
336
                ]
337
            );
338
            return true;
339
        }
340
341
        if ($this->getMessageProcessor()->getProcessedMessages() >= $this->limitMessageCount) {
342
            $this->logger->debug(
343
                "Stopped consumer",
344
                ['limit' => 'message_count', 'value' => (int)$this->getMessageProcessor()->getProcessedMessages()]
345
            );
346
            return true;
347
        }
348
        return false;
349
    }
350
351
    /**
352
     * Stop the consumer
353
     */
354
    public function stopConsuming()
355
    {
356
        try {
357
            $this->getChannel()->basic_cancel($this->getConsumerTag(), false, true);
358
        } catch (\Throwable $e) {
359
            $this->logger->notice("Got " . $e->getMessage() . " of type " . get_class($e));
360
        }
361
    }
362
363
    /**
364
     * Setup the consumer
365
     *
366
     * @param int $messages
367
     * @param int $seconds
368
     * @param int $maxMemory
369
     */
370
    protected function setupConsumer(int $messages, int $seconds, int $maxMemory)
371
    {
372
        $this->limitMessageCount = $messages;
373
        $this->limitSecondsUptime = $seconds;
374
        $this->limitMemoryConsumption = $maxMemory;
375
376
        $this->startTime = microtime(true);
377
378
        $this->setupChannelConsumer();
379
380
        $this->registerShutdownHandler();
381
        $this->handleKillSignals();
382
    }
383
384
    private function setupChannelConsumer()
385
    {
386
        if ($this->attributes['auto_create'] === true) {
387
            $this->create();
388
            $this->bind();
389
        }
390
391
        $this->getChannel()
392
            ->basic_qos(null, $this->prefetchCount, true);
393
394
        $this->getChannel()
395
            ->basic_consume(
396
                $this->attributes['name'],
397
                $this->getConsumerTag(),
398
                false,
399
                false,
400
                false,
401
                false,
402
                [
403
                    $this,
404
                    'consume'
405
                ]
406
            );
407
    }
408
409
    /**
410
     * Handle shutdown - Usually in case "Allowed memory size of x bytes exhausted"
411
     */
412
    private function registerShutdownHandler()
413
    {
414
        $consumer = $this;
415
        register_shutdown_function(function () use ($consumer) {
416
            $consumer->stopConsuming();
417
        });
418
    }
419
420
    /**
421
     * Register signals
422
     */
423
    protected function handleKillSignals()
424
    {
425
        if (extension_loaded('pcntl')) {
426
            pcntl_signal(SIGTERM, [$this, 'catchKillSignal']);
427
            pcntl_signal(SIGINT, [$this, 'catchKillSignal']);
428
429
            if (function_exists('pcntl_signal_dispatch')) {
430
                // let the signal go forward
431
                pcntl_signal_dispatch();
432
            }
433
        }
434
    }
435
436
    /**
437
     * Handle Kill Signals
438
     * @param int $signalNumber
439
     */
440
    public function catchKillSignal(int $signalNumber)
441
    {
442
        $this->stopConsuming();
443
        $this->logger->debug(sprintf("Caught signal %d", $signalNumber));
444
    }
445
446
    /**
447
     * It is the tag that is listed in RabbitMQ UI as the consumer "name"
448
     *
449
     * @return string
450
     */
451
    private function getConsumerTag(): string
452
    {
453
        return sprintf("%s_%s_%s", $this->aliasName, gethostname(), getmypid());
454
    }
455
456
    /**
457
     * @return MessageProcessorInterface
458
     */
459 1
    private function getMessageProcessor(): MessageProcessorInterface
460
    {
461 1
        if (!($this->messageProcessor instanceof MessageProcessorInterface)) {
462
            $this->messageProcessor = app($this->messageProcessor);
463
            if ($this->messageProcessor instanceof AbstractMessageProcessor) {
464
                $this->messageProcessor->setLogger($this->logger);
465
            }
466
        }
467 1
        return $this->messageProcessor;
468
    }
469
470
    /**
471
     * @param AMQPMessage $message
472
     * @throws \Throwable
473
     */
474 1
    public function consume(AMQPMessage $message)
475
    {
476
        try {
477 1
            $this->getMessageProcessor()->consume($message);
478 1
            $this->logger->debug("Consumed message", [$message->getBody()]);
479
        } catch (\Throwable $e) {
480
            $this->logger->notice(
481
                sprintf(
482
                    "Got %s from %s in %d",
483
                    $e->getMessage(),
484
                    (string)$e->getFile(),
485
                    (int)$e->getLine()
486
                )
487
            );
488
            // let the exception slide, the processor should handle
489
            // exception, this is just a notice that should not
490
            // ever appear
491
            throw $e;
492
        }
493 1
    }
494
}
495