Completed
Push — master ( 1d2662...30c418 )
by Tilita
19:22
created

QueueEntity   A

Complexity

Total Complexity 35

Size/Duplication

Total Lines 399
Duplicated Lines 6.52 %

Coupling/Cohesion

Components 1
Dependencies 7

Test Coverage

Coverage 34%

Importance

Changes 0
Metric Value
wmc 35
lcom 1
cbo 7
dl 26
loc 399
ccs 51
cts 150
cp 0.34
rs 9.6
c 0
b 0
f 0

22 Methods

Rating   Name   Duplication   Size   Complexity  
A createQueue() 0 8 1
A getAliasName() 0 4 1
A __construct() 0 6 1
A setPrefetchCount() 0 5 1
A setMessageProcessor() 0 5 1
A getConnection() 0 4 1
A getChannel() 0 4 1
A create() 13 13 1
A bind() 13 13 3
A delete() 0 4 1
A publish() 0 10 1
A startConsuming() 0 25 4
A shouldStopConsuming() 0 32 4
A stopConsuming() 0 5 1
A setupConsumer() 0 13 1
A setupChannelConsumer() 0 19 1
A registerShutdownHandler() 0 7 1
A handleKillSignals() 0 13 3
A catchKillSignal() 0 5 1
A getConsumerTag() 0 4 1
A getMessageProcessor() 0 10 3
A consume() 0 20 2

How to fix   Duplicated Code   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

1
<?php
2
namespace NeedleProject\LaravelRabbitMq\Entity;
3
4
use NeedleProject\LaravelRabbitMq\AMQPConnection;
5
use NeedleProject\LaravelRabbitMq\ConsumerInterface;
6
use NeedleProject\LaravelRabbitMq\Processor\AbstractMessageProcessor;
7
use NeedleProject\LaravelRabbitMq\Processor\MessageProcessorInterface;
8
use NeedleProject\LaravelRabbitMq\PublisherInterface;
9
use PhpAmqpLib\Channel\AMQPChannel;
10
use PhpAmqpLib\Exception\AMQPTimeoutException;
11
use PhpAmqpLib\Message\AMQPMessage;
12
use Psr\Log\LoggerAwareInterface;
13
use Psr\Log\LoggerAwareTrait;
14
15
/**
16
 * Class QueueEntity
17
 *
18
 * @package NeedleProject\LaravelRabbitMq\Entity
19
 * @author  Adrian Tilita <[email protected]>
20
 */
21
class QueueEntity implements PublisherInterface, ConsumerInterface, LoggerAwareInterface
22
{
23
    use LoggerAwareTrait;
24
25
    /**
26
     * @const array Default connections parameters
27
     */
28
    const DEFAULTS = [
29
        'passive'   => false,
30
        'durable'   => false,
31
        'exclusive' => false,
32
        'auto_delete' => false,
33
        'internal'  => false,
34
        'nowait'    => false,
35
    ];
36
37
    /**
38
     * @var AMQPConnection
39
     */
40
    protected $connection;
41
42
    /**
43
     * @var string
44
     */
45
    protected $aliasName;
46
47
    /**
48
     * @var array
49
     */
50
    protected $attributes;
51
52
    /**
53
     * @var int
54
     */
55
    protected $prefetchCount = 1;
56
57
    /**
58
     * @var null|string|MessageProcessorInterface
59
     */
60
    protected $messageProcessor = null;
61
62
    /**
63
     * @var int
64
     */
65
    protected $limitMessageCount;
66
67
    /**
68
     * @var int
69
     */
70
    protected $limitSecondsUptime;
71
72
    /**
73
     * @var int
74
     */
75
    protected $limitMemoryConsumption;
76
77
    /**
78
     * @var double
79
     */
80
    protected $startTime = 0;
81
82
    /**
83
     * @param AMQPConnection $connection
84
     * @param string $aliasName
85
     * @param array $exchangeDetails
86
     * @return QueueEntity
87
     */
88 11
    public static function createQueue(AMQPConnection $connection, string $aliasName, array $exchangeDetails)
89
    {
90 11
        return new static(
91
            $connection,
92
            $aliasName,
93 11
            array_merge(self::DEFAULTS, $exchangeDetails)
94
        );
95
    }
96
97
    /**
98
     * @return string
99
     */
100 3
    public function getAliasName(): string
101
    {
102 3
        return $this->aliasName;
103
    }
104
105
    /**
106
     * ExchangeEntity constructor.
107
     *
108
     * @param AMQPConnection $connection
109
     * @param string $aliasName
110
     * @param array $attributes
111
     */
112 11
    public function __construct(AMQPConnection $connection, string $aliasName, array $attributes = [])
113
    {
114 11
        $this->connection = $connection;
115 11
        $this->aliasName  = $aliasName;
116 11
        $this->attributes = $attributes;
117 11
    }
118
119
    /**
120
     * @param int $prefetchCount
121
     * @return ConsumerInterface
122
     */
123 2
    public function setPrefetchCount(int $prefetchCount): ConsumerInterface
124
    {
125 2
        $this->prefetchCount = $prefetchCount;
126 2
        return $this;
127
    }
128
129
    /**
130
     * @param string $messageProcessor
131
     * @return ConsumerInterface
132
     */
133 2
    public function setMessageProcessor(string $messageProcessor): ConsumerInterface
134
    {
135 2
        $this->messageProcessor = $messageProcessor;
136 2
        return $this;
137
    }
138
139
    /**
140
     * @return AMQPConnection
141
     */
142 4
    protected function getConnection(): AMQPConnection
143
    {
144 4
        return $this->connection;
145
    }
146
147
    /**
148
     * @return AMQPChannel
149
     */
150 4
    protected function getChannel(): AMQPChannel
151
    {
152 4
        return $this->getConnection()->getChannel();
153
    }
154
155
    /**
156
     * Create the Queue
157
     */
158 1 View Code Duplication
    public function create()
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
159
    {
160 1
        $this->getChannel()
161 1
            ->queue_declare(
162 1
                $this->attributes['name'],
163 1
                $this->attributes['passive'],
164 1
                $this->attributes['durable'],
165 1
                $this->attributes['exclusive'],
166 1
                $this->attributes['auto_delete'],
167 1
                $this->attributes['internal'],
168 1
                $this->attributes['nowait']
169
            );
170 1
    }
171
172 1 View Code Duplication
    public function bind()
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
173
    {
174 1
        if (isset($this->attributes['bind'])) {
175 1
            foreach ($this->attributes['bind'] as $bindItem) {
176 1
                $this->getChannel()
177 1
                    ->queue_bind(
178 1
                        $this->attributes['name'],
179 1
                        $bindItem['exchange'],
180 1
                        $bindItem['routing_key']
181
                    );
182
            }
183
        }
184 1
    }
185
186
    /**
187
     * Delete the queue
188
     */
189 1
    public function delete()
190
    {
191 1
        $this->getChannel()->queue_delete($this->attributes['name']);
192 1
    }
193
194
195
    /**
196
     * Publish a message
197
     *
198
     * @param string $message
199
     * @param string $routingKey
200
     * @return void
201
     */
202 1
    public function publish(string $message, string $routingKey = '')
203
    {
204 1
        $this->getChannel()
205 1
            ->basic_publish(
206 1
                new AMQPMessage($message),
207 1
                '',
208 1
                $this->attributes['name'],
209 1
                true
210
            );
211 1
    }
212
213
    /**
214
     * {@inheritdoc}
215
     *
216
     * @param int $messages
217
     * @param int $seconds
218
     * @param int $maxMemory
219
     * @return int
220
     */
221
    public function startConsuming(int $messages, int $seconds, int $maxMemory)
222
    {
223
        $this->setupConsumer($messages, $seconds, $maxMemory);
224
        while (false === $this->shouldStopConsuming()) {
225
            try {
226
                $this->getChannel()->wait(null, false, 1);
227
            } catch (AMQPTimeoutException $e) {
228
                $this->logger->debug("Timeout exceeded, reconnecting!");
229
                usleep(1000);
230
                $this->getConnection()->reconnect();
231
                $this->setupChannelConsumer();
232
            } catch (\Throwable $e) {
233
                // stop the consumer
234
                $this->stopConsuming();
235
                $this->logger->critical(sprintf(
236
                    "Stopped consuming: %s in %s:%d",
237
                    $e->getMessage(),
238
                    (string)$e->getFile(),
239
                    (int)$e->getLine()
240
                ));
241
                return 1;
242
            }
243
        }
244
        return 0;
245
    }
246
247
    /**
248
     * @return bool
249
     */
250
    protected function shouldStopConsuming(): bool
251
    {
252
        if ((microtime(true) - $this->startTime) > $this->limitSecondsUptime) {
253
            $this->logger->debug(
254
                "Stopped consumer",
255
                [
256
                    'limit' => 'time_limit',
257
                    'value' => sprintf("%.2f", microtime(true) - $this->startTime)
258
                ]
259
            );
260
            return true;
261
        }
262
        if (memory_get_peak_usage(true) >= ($this->limitMemoryConsumption * 1048576)) {
263
            $this->logger->debug(
264
                "Stopped consumer",
265
                [
266
                    'limit' => 'memory_limit',
267
                    'value' => (int)round(memory_get_peak_usage(true) / 1048576, 2)
268
                ]
269
            );
270
            return true;
271
        }
272
273
        if ($this->getMessageProcessor()->getProcessedMessages() >= $this->limitMessageCount) {
274
            $this->logger->debug(
275
                "Stopped consumer",
276
                ['limit' => 'message_count', 'value' => (int)$this->getMessageProcessor()->getProcessedMessages()]
277
            );
278
            return true;
279
        }
280
        return false;
281
    }
282
283
    /**
284
     * Stop the consumer
285
     */
286
    public function stopConsuming()
287
    {
288
        $this->logger->debug("Stopping consumer!");
289
        $this->getChannel()->basic_cancel($this->getConsumerTag(), false, true);
290
    }
291
292
    /**
293
     * Setup the consumer
294
     *
295
     * @param int $messages
296
     * @param int $seconds
297
     * @param int $maxMemory
298
     */
299
    protected function setupConsumer(int $messages, int $seconds, int $maxMemory)
300
    {
301
        $this->limitMessageCount = $messages;
302
        $this->limitSecondsUptime = $seconds;
303
        $this->limitMemoryConsumption = $maxMemory;
304
305
        $this->startTime = microtime(true);
306
307
        $this->setupChannelConsumer();
308
309
        $this->registerShutdownHandler();
310
        $this->handleKillSignals();
311
    }
312
313
    private function setupChannelConsumer()
314
    {
315
        $this->getChannel()
316
            ->basic_qos(null, $this->prefetchCount, true);
317
318
        $this->getChannel()
319
            ->basic_consume(
320
                $this->attributes['name'],
321
                $this->getConsumerTag(),
322
                false,
323
                false,
324
                false,
325
                false,
326
                [
327
                    $this,
328
                    'consume'
329
                ]
330
            );
331
    }
332
333
    /**
334
     * Handle shutdown - Usually in case "Allowed memory size of x bytes exhausted"
335
     */
336
    private function registerShutdownHandler()
337
    {
338
        $consumer = $this;
339
        register_shutdown_function(function () use ($consumer) {
340
            $consumer->stopConsuming();
341
        });
342
    }
343
344
    /**
345
     * Register signals
346
     */
347
    private function handleKillSignals()
348
    {
349
        if (extension_loaded('pcntl')) {
350
            pcntl_signal(SIGTERM, [$this, 'catchKillSignal']);
351
            pcntl_signal(SIGKILL, [$this, 'catchKillSignal']);
352
            pcntl_signal(SIGSTOP, [$this, 'catchKillSignal']);
353
354
            if (function_exists('pcntl_signal_dispatch')) {
355
                // let the signal go forward
356
                pcntl_signal_dispatch();
357
            }
358
        }
359
    }
360
361
    /**
362
     * Handle Kill Signals
363
     * @param int $signalNumber
364
     */
365
    private function catchKillSignal(int $signalNumber)
366
    {
367
        $this->stopConsuming();
368
        $this->logger->debug(sprintf("Caught signal %d", $signalNumber));
369
    }
370
371
    /**
372
     * It is the tag that is listed in RabbitMQ UI as the consumer "name"
373
     *
374
     * @return string
375
     */
376
    private function getConsumerTag(): string
377
    {
378
        return sprintf("%s_%s_%s", $this->aliasName, gethostname(), getmypid());
379
    }
380
381
    /**
382
     * @return MessageProcessorInterface
383
     */
384
    private function getMessageProcessor(): MessageProcessorInterface
385
    {
386
        if (!($this->messageProcessor instanceof MessageProcessorInterface)) {
387
            $this->messageProcessor = app($this->messageProcessor);
388
            if ($this->messageProcessor instanceof AbstractMessageProcessor) {
389
                $this->messageProcessor->setLogger($this->logger);
390
            }
391
        }
392
        return $this->messageProcessor;
393
    }
394
395
    /**
396
     * @param AMQPMessage $message
397
     * @throws \Throwable
398
     */
399
    public function consume(AMQPMessage $message)
400
    {
401
        try {
402
            $this->getMessageProcessor()->consume($message);
403
            $this->logger->debug("Consumed message", [$message->getBody()]);
404
        } catch (\Throwable $e) {
405
            $this->logger->notice(
406
                sprintf(
407
                    "Got %s from %s in %d",
408
                    $e->getMessage(),
409
                    (string)$e->getFile(),
410
                    (int)$e->getLine()
411
                )
412
            );
413
            // let the exception slide, the processor should handle
414
            // exception, this is just a notice that should not
415
            // ever appear
416
            throw $e;
417
        }
418
    }
419
}
420