Passed
Push — master ( de28ef...ff0ea2 )
by Tilita
02:12
created

QueueEntity::reconnect()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 4
ccs 3
cts 3
cp 1
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 0
crap 1
1
<?php
2
namespace NeedleProject\LaravelRabbitMq\Entity;
3
4
use NeedleProject\LaravelRabbitMq\AMQPConnection;
5
use NeedleProject\LaravelRabbitMq\ConsumerInterface;
6
use NeedleProject\LaravelRabbitMq\Processor\AbstractMessageProcessor;
7
use NeedleProject\LaravelRabbitMq\Processor\MessageProcessorInterface;
8
use NeedleProject\LaravelRabbitMq\PublisherInterface;
9
use PhpAmqpLib\Channel\AMQPChannel;
10
use PhpAmqpLib\Exception\AMQPProtocolChannelException;
11
use PhpAmqpLib\Exception\AMQPTimeoutException;
12
use PhpAmqpLib\Message\AMQPMessage;
13
use Psr\Log\LoggerAwareInterface;
14
use Psr\Log\LoggerAwareTrait;
15
16
/**
17
 * Class QueueEntity
18
 *
19
 * @package NeedleProject\LaravelRabbitMq\Entity
20
 * @author  Adrian Tilita <[email protected]>
21
 */
22
class QueueEntity implements PublisherInterface, ConsumerInterface, AMQPEntityInterface, LoggerAwareInterface
23
{
24
    use LoggerAwareTrait;
25
26
    /**
27
     * @const array Default connections parameters
28
     */
29
    const DEFAULTS = [
30
        // Whether to check if it exists or to verify existance using argument types (Throws PRECONDITION_FAILED)
31
        'passive'                      => false,
32
        // Entities with durable will be re-created uppon server restart
33
        'durable'                      => false,
34
        // whether to use it by only one channel, then it gets deleted
35
        'exclusive'                    => false,
36
        // Whether to delete it when the queue has no event on it
37
        'auto_delete'                  => false,
38
        // Whether the exchange can be used by a publisher or block it (declared just for internal "wiring")
39
        'internal'                     => false,
40
        // Whether to receive a Declare confirmation
41
        'nowait'                       => false,
42
        // Whether to auto create the entity before publishing/consuming it
43
        'auto_create'                  => false,
44
        // whether to "hide" the exception on re-declare.
45
        // if the `passive` filter is set true, this is redundant
46
        'throw_exception_on_redeclare' => true,
47
        // whether to throw on exception when trying to
48
        // bind to an in-existent queue/exchange
49
        'throw_exception_on_bind_fail' => true,
50
    ];
51
52
    /**
53
     * @var AMQPConnection
54
     */
55
    protected $connection;
56
57
    /**
58
     * @var string
59
     */
60
    protected $aliasName;
61
62
    /**
63
     * @var array
64
     */
65
    protected $attributes;
66
67
    /**
68
     * @var int
69
     */
70
    protected $prefetchCount = 1;
71
72
    /**
73
     * @var null|string|MessageProcessorInterface
74
     */
75
    protected $messageProcessor = null;
76
77
    /**
78
     * @var int
79
     */
80
    protected $limitMessageCount;
81
82
    /**
83
     * @var int
84
     */
85
    protected $limitSecondsUptime;
86
87
    /**
88
     * @var int
89
     */
90
    protected $limitMemoryConsumption;
91
92
    /**
93
     * @var double
94
     */
95
    protected $startTime = 0;
96
97
    /**
98
     * @param AMQPConnection $connection
99
     * @param string $aliasName
100
     * @param array $exchangeDetails
101
     * @return QueueEntity
102
     */
103 17
    public static function createQueue(AMQPConnection $connection, string $aliasName, array $exchangeDetails)
104
    {
105 17
        return new static(
106
            $connection,
107
            $aliasName,
108 17
            array_merge(self::DEFAULTS, $exchangeDetails)
109
        );
110
    }
111
112
    /**
113
     * @return string
114
     */
115 3
    public function getAliasName(): string
116
    {
117 3
        return $this->aliasName;
118
    }
119
120
    /**
121
     * ExchangeEntity constructor.
122
     *
123
     * @param AMQPConnection $connection
124
     * @param string $aliasName
125
     * @param array $attributes
126
     */
127 17
    public function __construct(AMQPConnection $connection, string $aliasName, array $attributes = [])
128
    {
129 17
        $this->connection = $connection;
130 17
        $this->aliasName  = $aliasName;
131 17
        $this->attributes = $attributes;
132 17
    }
133
134
    /**
135
     * @param int $prefetchCount
136
     * @return ConsumerInterface
137
     */
138 2
    public function setPrefetchCount(int $prefetchCount): ConsumerInterface
139
    {
140 2
        $this->prefetchCount = $prefetchCount;
141 2
        return $this;
142
    }
143
144
    /**
145
     * @param string $messageProcessor
146
     * @return ConsumerInterface
147
     */
148 2
    public function setMessageProcessor(string $messageProcessor): ConsumerInterface
149
    {
150 2
        $this->messageProcessor = $messageProcessor;
151 2
        return $this;
152
    }
153
154
    /**
155
     * @return AMQPConnection
156
     */
157 8
    protected function getConnection(): AMQPConnection
158
    {
159 8
        return $this->connection;
160
    }
161
162
    /**
163
     * @return AMQPChannel
164
     */
165 8
    protected function getChannel(): AMQPChannel
166
    {
167 8
        return $this->getConnection()->getChannel();
168
    }
169
170
    /**
171
     * Create the Queue
172
     */
173 4 View Code Duplication
    public function create()
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
174
    {
175
        try {
176 4
            $this->getChannel()
177 4
                ->queue_declare(
178 4
                    $this->attributes['name'],
179 4
                    $this->attributes['passive'],
180 4
                    $this->attributes['durable'],
181 4
                    $this->attributes['exclusive'],
182 4
                    $this->attributes['auto_delete'],
183 4
                    $this->attributes['internal'],
184 4
                    $this->attributes['nowait']
185
                );
186 2
        } catch (AMQPProtocolChannelException $e) {
187
            // 406 is a soft error triggered for precondition failure (when redeclaring with different parameters)
188 2
            if (true === $this->attributes['throw_exception_on_redeclare'] || $e->amqp_reply_code !== 406) {
0 ignored issues
show
Unused Code Bug introduced by
The strict comparison !== seems to always evaluate to true as the types of $e->amqp_reply_code (string) and 406 (integer) can never be identical. Maybe you want to use a loose comparison != instead?
Loading history...
189 1
                throw $e;
190
            }
191
            // a failure trigger channels closing process
192 1
            $this->reconnect();
193
        }
194 3
    }
195
196 4 View Code Duplication
    public function bind()
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
197
    {
198 4
        if (!isset($this->attributes['bind']) || empty($this->attributes['bind'])) {
199 1
            return;
200
        }
201 3
        foreach ($this->attributes['bind'] as $bindItem) {
202
            try {
203 3
                $this->getChannel()
204 3
                    ->queue_bind(
205 3
                        $this->attributes['name'],
206 3
                        $bindItem['exchange'],
207 3
                        $bindItem['routing_key']
208
                    );
209 1
            } catch (AMQPProtocolChannelException $e) {
210
                // 404 is the code for trying to bind to an non-existing entity
211 1
                if (true === $this->attributes['throw_exception_on_bind_fail'] || $e->amqp_reply_code !== 404) {
0 ignored issues
show
Unused Code Bug introduced by
The strict comparison !== seems to always evaluate to true as the types of $e->amqp_reply_code (string) and 404 (integer) can never be identical. Maybe you want to use a loose comparison != instead?
Loading history...
212 1
                    throw $e;
213
                }
214 2
                $this->reconnect();
215
            }
216
        }
217 2
    }
218
219
    /**
220
     * Delete the queue
221
     */
222 1
    public function delete()
223
    {
224 1
        $this->getChannel()->queue_delete($this->attributes['name']);
225 1
    }
226
227
    /**
228
     * {@inheritdoc}
229
     */
230 1
    public function reconnect()
231
    {
232 1
        $this->getConnection()->reconnect();
233 1
    }
234
235
    /**
236
     * Publish a message
237
     *
238
     * @param string $message
239
     * @param string $routingKey
240
     * @return mixed|void
241
     * @throws AMQPProtocolChannelException
242
     */
243 2 View Code Duplication
    public function publish(string $message, string $routingKey = '')
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
244
    {
245 2
        if ($this->attributes['auto_create'] === true) {
246 1
            $this->create();
247 1
            $this->bind();
248
        }
249 2
        $this->getChannel()
250 2
            ->basic_publish(
251 2
                new AMQPMessage($message),
252 2
                '',
253 2
                $this->attributes['name'],
254 2
                true
255
            );
256 2
    }
257
258
    /**
259
     * {@inheritdoc}
260
     *
261
     * @param int $messages
262
     * @param int $seconds
263
     * @param int $maxMemory
264
     * @return int
265
     */
266
    public function startConsuming(int $messages, int $seconds, int $maxMemory)
267
    {
268
        $this->setupConsumer($messages, $seconds, $maxMemory);
269
        while (false === $this->shouldStopConsuming()) {
270
            try {
271
                $this->getChannel()->wait(null, false, 1);
272
            } catch (AMQPTimeoutException $e) {
273
                usleep(1000);
274
                $this->getConnection()->reconnect();
275
                $this->setupChannelConsumer();
276
            } catch (\Throwable $e) {
277
                // stop the consumer
278
                $this->stopConsuming();
279
                $this->logger->notice(sprintf(
280
                    "Stopped consuming: %s in %s:%d",
281
                    get_class($e) . ' - ' . $e->getMessage(),
282
                    (string)$e->getFile(),
283
                    (int)$e->getLine()
284
                ));
285
                return 1;
286
            }
287
        }
288
        return 0;
289
    }
290
291
    /**
292
     * @return bool
293
     */
294
    protected function shouldStopConsuming(): bool
295
    {
296
        if ((microtime(true) - $this->startTime) > $this->limitSecondsUptime) {
297
            $this->logger->debug(
298
                "Stopped consumer",
299
                [
300
                    'limit' => 'time_limit',
301
                    'value' => sprintf("%.2f", microtime(true) - $this->startTime)
302
                ]
303
            );
304
            return true;
305
        }
306
        if (memory_get_peak_usage(true) >= ($this->limitMemoryConsumption * 1048576)) {
307
            $this->logger->debug(
308
                "Stopped consumer",
309
                [
310
                    'limit' => 'memory_limit',
311
                    'value' => (int)round(memory_get_peak_usage(true) / 1048576, 2)
312
                ]
313
            );
314
            return true;
315
        }
316
317
        if ($this->getMessageProcessor()->getProcessedMessages() >= $this->limitMessageCount) {
318
            $this->logger->debug(
319
                "Stopped consumer",
320
                ['limit' => 'message_count', 'value' => (int)$this->getMessageProcessor()->getProcessedMessages()]
321
            );
322
            return true;
323
        }
324
        return false;
325
    }
326
327
    /**
328
     * Stop the consumer
329
     */
330
    public function stopConsuming()
331
    {
332
        try {
333
            $this->getChannel()->basic_cancel($this->getConsumerTag(), false, true);
334
        } catch (\Throwable $e) {
335
            $this->logger->notice("Got " . $e->getMessage() . " of type " . get_class($e));
336
        }
337
    }
338
339
    /**
340
     * Setup the consumer
341
     *
342
     * @param int $messages
343
     * @param int $seconds
344
     * @param int $maxMemory
345
     */
346
    protected function setupConsumer(int $messages, int $seconds, int $maxMemory)
347
    {
348
        $this->limitMessageCount = $messages;
349
        $this->limitSecondsUptime = $seconds;
350
        $this->limitMemoryConsumption = $maxMemory;
351
352
        $this->startTime = microtime(true);
353
354
        $this->setupChannelConsumer();
355
356
        $this->registerShutdownHandler();
357
        $this->handleKillSignals();
358
    }
359
360
    private function setupChannelConsumer()
361
    {
362
        if ($this->attributes['auto_create'] === true) {
363
            $this->create();
364
            $this->bind();
365
        }
366
367
        $this->getChannel()
368
            ->basic_qos(null, $this->prefetchCount, true);
369
370
        $this->getChannel()
371
            ->basic_consume(
372
                $this->attributes['name'],
373
                $this->getConsumerTag(),
374
                false,
375
                false,
376
                false,
377
                false,
378
                [
379
                    $this,
380
                    'consume'
381
                ]
382
            );
383
    }
384
385
    /**
386
     * Handle shutdown - Usually in case "Allowed memory size of x bytes exhausted"
387
     */
388
    private function registerShutdownHandler()
389
    {
390
        $consumer = $this;
391
        register_shutdown_function(function () use ($consumer) {
392
            $consumer->stopConsuming();
393
        });
394
    }
395
396
    /**
397
     * Register signals
398
     */
399
    protected function handleKillSignals()
400
    {
401
        if (extension_loaded('pcntl')) {
402
            pcntl_signal(SIGTERM, [$this, 'catchKillSignal']);
403
            pcntl_signal(SIGINT, [$this, 'catchKillSignal']);
404
405
            if (function_exists('pcntl_signal_dispatch')) {
406
                // let the signal go forward
407
                pcntl_signal_dispatch();
408
            }
409
        }
410
    }
411
412
    /**
413
     * Handle Kill Signals
414
     * @param int $signalNumber
415
     */
416
    public function catchKillSignal(int $signalNumber)
417
    {
418
        $this->stopConsuming();
419
        $this->logger->debug(sprintf("Caught signal %d", $signalNumber));
420
    }
421
422
    /**
423
     * It is the tag that is listed in RabbitMQ UI as the consumer "name"
424
     *
425
     * @return string
426
     */
427
    private function getConsumerTag(): string
428
    {
429
        return sprintf("%s_%s_%s", $this->aliasName, gethostname(), getmypid());
430
    }
431
432
    /**
433
     * @return MessageProcessorInterface
434
     */
435 1
    private function getMessageProcessor(): MessageProcessorInterface
436
    {
437 1
        if (!($this->messageProcessor instanceof MessageProcessorInterface)) {
438
            $this->messageProcessor = app($this->messageProcessor);
439
            if ($this->messageProcessor instanceof AbstractMessageProcessor) {
440
                $this->messageProcessor->setLogger($this->logger);
441
            }
442
        }
443 1
        return $this->messageProcessor;
444
    }
445
446
    /**
447
     * @param AMQPMessage $message
448
     * @throws \Throwable
449
     */
450 1
    public function consume(AMQPMessage $message)
451
    {
452
        try {
453 1
            $this->getMessageProcessor()->consume($message);
454 1
            $this->logger->debug("Consumed message", [$message->getBody()]);
455
        } catch (\Throwable $e) {
456
            $this->logger->notice(
457
                sprintf(
458
                    "Got %s from %s in %d",
459
                    $e->getMessage(),
460
                    (string)$e->getFile(),
461
                    (int)$e->getLine()
462
                )
463
            );
464
            // let the exception slide, the processor should handle
465
            // exception, this is just a notice that should not
466
            // ever appear
467
            throw $e;
468
        }
469 1
    }
470
}
471