Completed
Push — master ( f58be8...855521 )
by Tilita
03:07
created

QueueEntity   B

Complexity

Total Complexity 45

Size/Duplication

Total Lines 430
Duplicated Lines 13.49 %

Coupling/Cohesion

Components 1
Dependencies 8

Test Coverage

Coverage 42.68%

Importance

Changes 0
Metric Value
wmc 45
lcom 1
cbo 8
dl 58
loc 430
ccs 70
cts 164
cp 0.4268
rs 8.8
c 0
b 0
f 0

22 Methods

Rating   Name   Duplication   Size   Complexity  
A createQueue() 0 8 1
A getAliasName() 0 4 1
A __construct() 0 6 1
A setPrefetchCount() 0 5 1
A setMessageProcessor() 0 5 1
A getConnection() 0 4 1
A getChannel() 0 4 1
A create() 22 22 4
B bind() 22 22 7
A delete() 0 4 1
A publish() 14 14 2
A startConsuming() 0 24 4
A shouldStopConsuming() 0 32 4
A stopConsuming() 0 8 2
A setupConsumer() 0 13 1
A setupChannelConsumer() 0 24 2
A registerShutdownHandler() 0 7 1
A handleKillSignals() 0 12 3
A catchKillSignal() 0 5 1
A getConsumerTag() 0 4 1
A getMessageProcessor() 0 10 3
A consume() 0 20 2

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complex Class

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like QueueEntity often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes. You can also have a look at the cohesion graph to spot any un-connected, or weakly-connected components.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use QueueEntity, and based on these observations, apply Extract Interface, too.

1
<?php
2
namespace NeedleProject\LaravelRabbitMq\Entity;
3
4
use NeedleProject\LaravelRabbitMq\AMQPConnection;
5
use NeedleProject\LaravelRabbitMq\ConsumerInterface;
6
use NeedleProject\LaravelRabbitMq\Processor\AbstractMessageProcessor;
7
use NeedleProject\LaravelRabbitMq\Processor\MessageProcessorInterface;
8
use NeedleProject\LaravelRabbitMq\PublisherInterface;
9
use PhpAmqpLib\Channel\AMQPChannel;
10
use PhpAmqpLib\Exception\AMQPProtocolChannelException;
11
use PhpAmqpLib\Exception\AMQPTimeoutException;
12
use PhpAmqpLib\Message\AMQPMessage;
13
use Psr\Log\LoggerAwareInterface;
14
use Psr\Log\LoggerAwareTrait;
15
16
/**
17
 * Class QueueEntity
18
 *
19
 * @package NeedleProject\LaravelRabbitMq\Entity
20
 * @author  Adrian Tilita <[email protected]>
21
 */
22
class QueueEntity implements PublisherInterface, ConsumerInterface, AMQPEntityInterface, LoggerAwareInterface
23
{
24
    use LoggerAwareTrait;
25
26
    /**
27
     * @const array Default connections parameters
28
     */
29
    const DEFAULTS = [
30
        'passive'                      => false,
31
        'durable'                      => false,
32
        'exclusive'                    => false,
33
        'auto_delete'                  => false,
34
        'internal'                     => false,
35
        'nowait'                       => false,
36
        'auto_create'                  => false,
37
        'throw_exception_on_redeclare' => true,
38
        'throw_exception_on_bind_fail' => true,
39
    ];
40
41
    /**
42
     * @var AMQPConnection
43
     */
44
    protected $connection;
45
46
    /**
47
     * @var string
48
     */
49
    protected $aliasName;
50
51
    /**
52
     * @var array
53
     */
54
    protected $attributes;
55
56
    /**
57
     * @var int
58
     */
59
    protected $prefetchCount = 1;
60
61
    /**
62
     * @var null|string|MessageProcessorInterface
63
     */
64
    protected $messageProcessor = null;
65
66
    /**
67
     * @var int
68
     */
69
    protected $limitMessageCount;
70
71
    /**
72
     * @var int
73
     */
74
    protected $limitSecondsUptime;
75
76
    /**
77
     * @var int
78
     */
79
    protected $limitMemoryConsumption;
80
81
    /**
82
     * @var double
83
     */
84
    protected $startTime = 0;
85
86
    /**
87
     * @param AMQPConnection $connection
88
     * @param string $aliasName
89
     * @param array $exchangeDetails
90
     * @return QueueEntity
91
     */
92 17
    public static function createQueue(AMQPConnection $connection, string $aliasName, array $exchangeDetails)
93
    {
94 17
        return new static(
95
            $connection,
96
            $aliasName,
97 17
            array_merge(self::DEFAULTS, $exchangeDetails)
98
        );
99
    }
100
101
    /**
102
     * @return string
103
     */
104 3
    public function getAliasName(): string
105
    {
106 3
        return $this->aliasName;
107
    }
108
109
    /**
110
     * ExchangeEntity constructor.
111
     *
112
     * @param AMQPConnection $connection
113
     * @param string $aliasName
114
     * @param array $attributes
115
     */
116 17
    public function __construct(AMQPConnection $connection, string $aliasName, array $attributes = [])
117
    {
118 17
        $this->connection = $connection;
119 17
        $this->aliasName  = $aliasName;
120 17
        $this->attributes = $attributes;
121 17
    }
122
123
    /**
124
     * @param int $prefetchCount
125
     * @return ConsumerInterface
126
     */
127 2
    public function setPrefetchCount(int $prefetchCount): ConsumerInterface
128
    {
129 2
        $this->prefetchCount = $prefetchCount;
130 2
        return $this;
131
    }
132
133
    /**
134
     * @param string $messageProcessor
135
     * @return ConsumerInterface
136
     */
137 2
    public function setMessageProcessor(string $messageProcessor): ConsumerInterface
138
    {
139 2
        $this->messageProcessor = $messageProcessor;
140 2
        return $this;
141
    }
142
143
    /**
144
     * @return AMQPConnection
145
     */
146 8
    protected function getConnection(): AMQPConnection
147
    {
148 8
        return $this->connection;
149
    }
150
151
    /**
152
     * @return AMQPChannel
153
     */
154 8
    protected function getChannel(): AMQPChannel
155
    {
156 8
        return $this->getConnection()->getChannel();
157
    }
158
159
    /**
160
     * Create the Queue
161
     */
162 4 View Code Duplication
    public function create()
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
163
    {
164
        try {
165 4
            $this->getChannel()
166 4
                ->queue_declare(
167 4
                    $this->attributes['name'],
168 4
                    $this->attributes['passive'],
169 4
                    $this->attributes['durable'],
170 4
                    $this->attributes['exclusive'],
171 4
                    $this->attributes['auto_delete'],
172 4
                    $this->attributes['internal'],
173 4
                    $this->attributes['nowait']
174
                );
175 2
        } catch (AMQPProtocolChannelException $e) {
176
            // 406 is a soft error triggered for precondition failure (when redeclaring with different parameters)
177 2
            if (true === $this->attributes['throw_exception_on_redeclare'] || $e->amqp_reply_code !== 406) {
0 ignored issues
show
Unused Code Bug introduced by
The strict comparison !== seems to always evaluate to true as the types of $e->amqp_reply_code (string) and 406 (integer) can never be identical. Maybe you want to use a loose comparison != instead?
Loading history...
178 1
                throw $e;
179
            }
180
            // a failure trigger channels closing process
181 1
            $this->getConnection()->reconnect();
182
        }
183 3
    }
184
185 4 View Code Duplication
    public function bind()
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
186
    {
187 4
        if (!isset($this->attributes['bind']) || empty($this->attributes['bind'])) {
188 1
            return;
189
        }
190 3
        foreach ($this->attributes['bind'] as $bindItem) {
191
            try {
192 3
                $this->getChannel()
193 3
                    ->queue_bind(
194 3
                        $this->attributes['name'],
195 3
                        $bindItem['exchange'],
196 3
                        $bindItem['routing_key']
197
                    );
198 1
            } catch (AMQPProtocolChannelException $e) {
199
                // 404 is the code for trying to bind to an non-existing entity
200 1
                if (true === $this->attributes['throw_exception_on_bind_fail'] || $e->amqp_reply_code !== 404) {
0 ignored issues
show
Unused Code Bug introduced by
The strict comparison !== seems to always evaluate to true as the types of $e->amqp_reply_code (string) and 404 (integer) can never be identical. Maybe you want to use a loose comparison != instead?
Loading history...
201 1
                    throw $e;
202
                }
203 2
                $this->getConnection()->reconnect();
204
            }
205
        }
206 2
    }
207
208
    /**
209
     * Delete the queue
210
     */
211 1
    public function delete()
212
    {
213 1
        $this->getChannel()->queue_delete($this->attributes['name']);
214 1
    }
215
216
    /**
217
     * Publish a message
218
     *
219
     * @param string $message
220
     * @param string $routingKey
221
     * @return mixed|void
222
     * @throws AMQPProtocolChannelException
223
     */
224 2 View Code Duplication
    public function publish(string $message, string $routingKey = '')
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
225
    {
226 2
        if ($this->attributes['auto_create'] === true) {
227 1
            $this->create();
228 1
            $this->bind();
229
        }
230 2
        $this->getChannel()
231 2
            ->basic_publish(
232 2
                new AMQPMessage($message),
233 2
                '',
234 2
                $this->attributes['name'],
235 2
                true
236
            );
237 2
    }
238
239
    /**
240
     * {@inheritdoc}
241
     *
242
     * @param int $messages
243
     * @param int $seconds
244
     * @param int $maxMemory
245
     * @return int
246
     */
247
    public function startConsuming(int $messages, int $seconds, int $maxMemory)
248
    {
249
        $this->setupConsumer($messages, $seconds, $maxMemory);
250
        while (false === $this->shouldStopConsuming()) {
251
            try {
252
                $this->getChannel()->wait(null, false, 1);
253
            } catch (AMQPTimeoutException $e) {
254
                usleep(1000);
255
                $this->getConnection()->reconnect();
256
                $this->setupChannelConsumer();
257
            } catch (\Throwable $e) {
258
                // stop the consumer
259
                $this->stopConsuming();
260
                $this->logger->notice(sprintf(
261
                    "Stopped consuming: %s in %s:%d",
262
                    get_class($e) . ' - ' . $e->getMessage(),
263
                    (string)$e->getFile(),
264
                    (int)$e->getLine()
265
                ));
266
                return 1;
267
            }
268
        }
269
        return 0;
270
    }
271
272
    /**
273
     * @return bool
274
     */
275
    protected function shouldStopConsuming(): bool
276
    {
277
        if ((microtime(true) - $this->startTime) > $this->limitSecondsUptime) {
278
            $this->logger->debug(
279
                "Stopped consumer",
280
                [
281
                    'limit' => 'time_limit',
282
                    'value' => sprintf("%.2f", microtime(true) - $this->startTime)
283
                ]
284
            );
285
            return true;
286
        }
287
        if (memory_get_peak_usage(true) >= ($this->limitMemoryConsumption * 1048576)) {
288
            $this->logger->debug(
289
                "Stopped consumer",
290
                [
291
                    'limit' => 'memory_limit',
292
                    'value' => (int)round(memory_get_peak_usage(true) / 1048576, 2)
293
                ]
294
            );
295
            return true;
296
        }
297
298
        if ($this->getMessageProcessor()->getProcessedMessages() >= $this->limitMessageCount) {
299
            $this->logger->debug(
300
                "Stopped consumer",
301
                ['limit' => 'message_count', 'value' => (int)$this->getMessageProcessor()->getProcessedMessages()]
302
            );
303
            return true;
304
        }
305
        return false;
306
    }
307
308
    /**
309
     * Stop the consumer
310
     */
311
    public function stopConsuming()
312
    {
313
        try {
314
            $this->getChannel()->basic_cancel($this->getConsumerTag(), false, true);
315
        } catch (\Throwable $e) {
316
            $this->logger->notice("Got " . $e->getMessage() . " of type " . get_class($e));
317
        }
318
    }
319
320
    /**
321
     * Setup the consumer
322
     *
323
     * @param int $messages
324
     * @param int $seconds
325
     * @param int $maxMemory
326
     */
327
    protected function setupConsumer(int $messages, int $seconds, int $maxMemory)
328
    {
329
        $this->limitMessageCount = $messages;
330
        $this->limitSecondsUptime = $seconds;
331
        $this->limitMemoryConsumption = $maxMemory;
332
333
        $this->startTime = microtime(true);
334
335
        $this->setupChannelConsumer();
336
337
        $this->registerShutdownHandler();
338
        $this->handleKillSignals();
339
    }
340
341
    private function setupChannelConsumer()
342
    {
343
        if ($this->attributes['auto_create'] === true) {
344
            $this->create();
345
            $this->bind();
346
        }
347
348
        $this->getChannel()
349
            ->basic_qos(null, $this->prefetchCount, true);
350
351
        $this->getChannel()
352
            ->basic_consume(
353
                $this->attributes['name'],
354
                $this->getConsumerTag(),
355
                false,
356
                false,
357
                false,
358
                false,
359
                [
360
                    $this,
361
                    'consume'
362
                ]
363
            );
364
    }
365
366
    /**
367
     * Handle shutdown - Usually in case "Allowed memory size of x bytes exhausted"
368
     */
369
    private function registerShutdownHandler()
370
    {
371
        $consumer = $this;
372
        register_shutdown_function(function () use ($consumer) {
373
            $consumer->stopConsuming();
374
        });
375
    }
376
377
    /**
378
     * Register signals
379
     */
380
    protected function handleKillSignals()
381
    {
382
        if (extension_loaded('pcntl')) {
383
            pcntl_signal(SIGTERM, [$this, 'catchKillSignal']);
384
            pcntl_signal(SIGINT, [$this, 'catchKillSignal']);
385
386
            if (function_exists('pcntl_signal_dispatch')) {
387
                // let the signal go forward
388
                pcntl_signal_dispatch();
389
            }
390
        }
391
    }
392
393
    /**
394
     * Handle Kill Signals
395
     * @param int $signalNumber
396
     */
397
    public function catchKillSignal(int $signalNumber)
398
    {
399
        $this->stopConsuming();
400
        $this->logger->debug(sprintf("Caught signal %d", $signalNumber));
401
    }
402
403
    /**
404
     * It is the tag that is listed in RabbitMQ UI as the consumer "name"
405
     *
406
     * @return string
407
     */
408
    private function getConsumerTag(): string
409
    {
410
        return sprintf("%s_%s_%s", $this->aliasName, gethostname(), getmypid());
411
    }
412
413
    /**
414
     * @return MessageProcessorInterface
415
     */
416 1
    private function getMessageProcessor(): MessageProcessorInterface
417
    {
418 1
        if (!($this->messageProcessor instanceof MessageProcessorInterface)) {
419
            $this->messageProcessor = app($this->messageProcessor);
420
            if ($this->messageProcessor instanceof AbstractMessageProcessor) {
421
                $this->messageProcessor->setLogger($this->logger);
422
            }
423
        }
424 1
        return $this->messageProcessor;
425
    }
426
427
    /**
428
     * @param AMQPMessage $message
429
     * @throws \Throwable
430
     */
431 1
    public function consume(AMQPMessage $message)
432
    {
433
        try {
434 1
            $this->getMessageProcessor()->consume($message);
435 1
            $this->logger->debug("Consumed message", [$message->getBody()]);
436
        } catch (\Throwable $e) {
437
            $this->logger->notice(
438
                sprintf(
439
                    "Got %s from %s in %d",
440
                    $e->getMessage(),
441
                    (string)$e->getFile(),
442
                    (int)$e->getLine()
443
                )
444
            );
445
            // let the exception slide, the processor should handle
446
            // exception, this is just a notice that should not
447
            // ever appear
448
            throw $e;
449
        }
450 1
    }
451
}
452