Completed
Push — master ( f58be8...855521 )
by Tilita
03:07
created

QueueEntity::consume()   A

Complexity

Conditions 2
Paths 3

Size

Total Lines 20

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 3.0309

Importance

Changes 0
Metric Value
dl 0
loc 20
ccs 4
cts 11
cp 0.3636
rs 9.6
c 0
b 0
f 0
cc 2
nc 3
nop 1
crap 3.0309
1
<?php
2
namespace NeedleProject\LaravelRabbitMq\Entity;
3
4
use NeedleProject\LaravelRabbitMq\AMQPConnection;
5
use NeedleProject\LaravelRabbitMq\ConsumerInterface;
6
use NeedleProject\LaravelRabbitMq\Processor\AbstractMessageProcessor;
7
use NeedleProject\LaravelRabbitMq\Processor\MessageProcessorInterface;
8
use NeedleProject\LaravelRabbitMq\PublisherInterface;
9
use PhpAmqpLib\Channel\AMQPChannel;
10
use PhpAmqpLib\Exception\AMQPProtocolChannelException;
11
use PhpAmqpLib\Exception\AMQPTimeoutException;
12
use PhpAmqpLib\Message\AMQPMessage;
13
use Psr\Log\LoggerAwareInterface;
14
use Psr\Log\LoggerAwareTrait;
15
16
/**
17
 * Class QueueEntity
18
 *
19
 * @package NeedleProject\LaravelRabbitMq\Entity
20
 * @author  Adrian Tilita <[email protected]>
21
 */
22
class QueueEntity implements PublisherInterface, ConsumerInterface, AMQPEntityInterface, LoggerAwareInterface
23
{
24
    use LoggerAwareTrait;
25
26
    /**
27
     * @const array Default connections parameters
28
     */
29
    const DEFAULTS = [
30
        'passive'                      => false,
31
        'durable'                      => false,
32
        'exclusive'                    => false,
33
        'auto_delete'                  => false,
34
        'internal'                     => false,
35
        'nowait'                       => false,
36
        'auto_create'                  => false,
37
        'throw_exception_on_redeclare' => true,
38
        'throw_exception_on_bind_fail' => true,
39
    ];
40
41
    /**
42
     * @var AMQPConnection
43
     */
44
    protected $connection;
45
46
    /**
47
     * @var string
48
     */
49
    protected $aliasName;
50
51
    /**
52
     * @var array
53
     */
54
    protected $attributes;
55
56
    /**
57
     * @var int
58
     */
59
    protected $prefetchCount = 1;
60
61
    /**
62
     * @var null|string|MessageProcessorInterface
63
     */
64
    protected $messageProcessor = null;
65
66
    /**
67
     * @var int
68
     */
69
    protected $limitMessageCount;
70
71
    /**
72
     * @var int
73
     */
74
    protected $limitSecondsUptime;
75
76
    /**
77
     * @var int
78
     */
79
    protected $limitMemoryConsumption;
80
81
    /**
82
     * @var double
83
     */
84
    protected $startTime = 0;
85
86
    /**
87
     * @param AMQPConnection $connection
88
     * @param string $aliasName
89
     * @param array $exchangeDetails
90
     * @return QueueEntity
91
     */
92 17
    public static function createQueue(AMQPConnection $connection, string $aliasName, array $exchangeDetails)
93
    {
94 17
        return new static(
95
            $connection,
96
            $aliasName,
97 17
            array_merge(self::DEFAULTS, $exchangeDetails)
98
        );
99
    }
100
101
    /**
102
     * @return string
103
     */
104 3
    public function getAliasName(): string
105
    {
106 3
        return $this->aliasName;
107
    }
108
109
    /**
110
     * ExchangeEntity constructor.
111
     *
112
     * @param AMQPConnection $connection
113
     * @param string $aliasName
114
     * @param array $attributes
115
     */
116 17
    public function __construct(AMQPConnection $connection, string $aliasName, array $attributes = [])
117
    {
118 17
        $this->connection = $connection;
119 17
        $this->aliasName  = $aliasName;
120 17
        $this->attributes = $attributes;
121 17
    }
122
123
    /**
124
     * @param int $prefetchCount
125
     * @return ConsumerInterface
126
     */
127 2
    public function setPrefetchCount(int $prefetchCount): ConsumerInterface
128
    {
129 2
        $this->prefetchCount = $prefetchCount;
130 2
        return $this;
131
    }
132
133
    /**
134
     * @param string $messageProcessor
135
     * @return ConsumerInterface
136
     */
137 2
    public function setMessageProcessor(string $messageProcessor): ConsumerInterface
138
    {
139 2
        $this->messageProcessor = $messageProcessor;
140 2
        return $this;
141
    }
142
143
    /**
144
     * @return AMQPConnection
145
     */
146 8
    protected function getConnection(): AMQPConnection
147
    {
148 8
        return $this->connection;
149
    }
150
151
    /**
152
     * @return AMQPChannel
153
     */
154 8
    protected function getChannel(): AMQPChannel
155
    {
156 8
        return $this->getConnection()->getChannel();
157
    }
158
159
    /**
160
     * Create the Queue
161
     */
162 4 View Code Duplication
    public function create()
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
163
    {
164
        try {
165 4
            $this->getChannel()
166 4
                ->queue_declare(
167 4
                    $this->attributes['name'],
168 4
                    $this->attributes['passive'],
169 4
                    $this->attributes['durable'],
170 4
                    $this->attributes['exclusive'],
171 4
                    $this->attributes['auto_delete'],
172 4
                    $this->attributes['internal'],
173 4
                    $this->attributes['nowait']
174
                );
175 2
        } catch (AMQPProtocolChannelException $e) {
176
            // 406 is a soft error triggered for precondition failure (when redeclaring with different parameters)
177 2
            if (true === $this->attributes['throw_exception_on_redeclare'] || $e->amqp_reply_code !== 406) {
0 ignored issues
show
Unused Code Bug introduced by
The strict comparison !== seems to always evaluate to true as the types of $e->amqp_reply_code (string) and 406 (integer) can never be identical. Maybe you want to use a loose comparison != instead?
Loading history...
178 1
                throw $e;
179
            }
180
            // a failure trigger channels closing process
181 1
            $this->getConnection()->reconnect();
182
        }
183 3
    }
184
185 4 View Code Duplication
    public function bind()
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
186
    {
187 4
        if (!isset($this->attributes['bind']) || empty($this->attributes['bind'])) {
188 1
            return;
189
        }
190 3
        foreach ($this->attributes['bind'] as $bindItem) {
191
            try {
192 3
                $this->getChannel()
193 3
                    ->queue_bind(
194 3
                        $this->attributes['name'],
195 3
                        $bindItem['exchange'],
196 3
                        $bindItem['routing_key']
197
                    );
198 1
            } catch (AMQPProtocolChannelException $e) {
199
                // 404 is the code for trying to bind to an non-existing entity
200 1
                if (true === $this->attributes['throw_exception_on_bind_fail'] || $e->amqp_reply_code !== 404) {
0 ignored issues
show
Unused Code Bug introduced by
The strict comparison !== seems to always evaluate to true as the types of $e->amqp_reply_code (string) and 404 (integer) can never be identical. Maybe you want to use a loose comparison != instead?
Loading history...
201 1
                    throw $e;
202
                }
203 2
                $this->getConnection()->reconnect();
204
            }
205
        }
206 2
    }
207
208
    /**
209
     * Delete the queue
210
     */
211 1
    public function delete()
212
    {
213 1
        $this->getChannel()->queue_delete($this->attributes['name']);
214 1
    }
215
216
    /**
217
     * Publish a message
218
     *
219
     * @param string $message
220
     * @param string $routingKey
221
     * @return mixed|void
222
     * @throws AMQPProtocolChannelException
223
     */
224 2 View Code Duplication
    public function publish(string $message, string $routingKey = '')
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
225
    {
226 2
        if ($this->attributes['auto_create'] === true) {
227 1
            $this->create();
228 1
            $this->bind();
229
        }
230 2
        $this->getChannel()
231 2
            ->basic_publish(
232 2
                new AMQPMessage($message),
233 2
                '',
234 2
                $this->attributes['name'],
235 2
                true
236
            );
237 2
    }
238
239
    /**
240
     * {@inheritdoc}
241
     *
242
     * @param int $messages
243
     * @param int $seconds
244
     * @param int $maxMemory
245
     * @return int
246
     */
247
    public function startConsuming(int $messages, int $seconds, int $maxMemory)
248
    {
249
        $this->setupConsumer($messages, $seconds, $maxMemory);
250
        while (false === $this->shouldStopConsuming()) {
251
            try {
252
                $this->getChannel()->wait(null, false, 1);
253
            } catch (AMQPTimeoutException $e) {
254
                usleep(1000);
255
                $this->getConnection()->reconnect();
256
                $this->setupChannelConsumer();
257
            } catch (\Throwable $e) {
258
                // stop the consumer
259
                $this->stopConsuming();
260
                $this->logger->notice(sprintf(
261
                    "Stopped consuming: %s in %s:%d",
262
                    get_class($e) . ' - ' . $e->getMessage(),
263
                    (string)$e->getFile(),
264
                    (int)$e->getLine()
265
                ));
266
                return 1;
267
            }
268
        }
269
        return 0;
270
    }
271
272
    /**
273
     * @return bool
274
     */
275
    protected function shouldStopConsuming(): bool
276
    {
277
        if ((microtime(true) - $this->startTime) > $this->limitSecondsUptime) {
278
            $this->logger->debug(
279
                "Stopped consumer",
280
                [
281
                    'limit' => 'time_limit',
282
                    'value' => sprintf("%.2f", microtime(true) - $this->startTime)
283
                ]
284
            );
285
            return true;
286
        }
287
        if (memory_get_peak_usage(true) >= ($this->limitMemoryConsumption * 1048576)) {
288
            $this->logger->debug(
289
                "Stopped consumer",
290
                [
291
                    'limit' => 'memory_limit',
292
                    'value' => (int)round(memory_get_peak_usage(true) / 1048576, 2)
293
                ]
294
            );
295
            return true;
296
        }
297
298
        if ($this->getMessageProcessor()->getProcessedMessages() >= $this->limitMessageCount) {
299
            $this->logger->debug(
300
                "Stopped consumer",
301
                ['limit' => 'message_count', 'value' => (int)$this->getMessageProcessor()->getProcessedMessages()]
302
            );
303
            return true;
304
        }
305
        return false;
306
    }
307
308
    /**
309
     * Stop the consumer
310
     */
311
    public function stopConsuming()
312
    {
313
        try {
314
            $this->getChannel()->basic_cancel($this->getConsumerTag(), false, true);
315
        } catch (\Throwable $e) {
316
            $this->logger->notice("Got " . $e->getMessage() . " of type " . get_class($e));
317
        }
318
    }
319
320
    /**
321
     * Setup the consumer
322
     *
323
     * @param int $messages
324
     * @param int $seconds
325
     * @param int $maxMemory
326
     */
327
    protected function setupConsumer(int $messages, int $seconds, int $maxMemory)
328
    {
329
        $this->limitMessageCount = $messages;
330
        $this->limitSecondsUptime = $seconds;
331
        $this->limitMemoryConsumption = $maxMemory;
332
333
        $this->startTime = microtime(true);
334
335
        $this->setupChannelConsumer();
336
337
        $this->registerShutdownHandler();
338
        $this->handleKillSignals();
339
    }
340
341
    private function setupChannelConsumer()
342
    {
343
        if ($this->attributes['auto_create'] === true) {
344
            $this->create();
345
            $this->bind();
346
        }
347
348
        $this->getChannel()
349
            ->basic_qos(null, $this->prefetchCount, true);
350
351
        $this->getChannel()
352
            ->basic_consume(
353
                $this->attributes['name'],
354
                $this->getConsumerTag(),
355
                false,
356
                false,
357
                false,
358
                false,
359
                [
360
                    $this,
361
                    'consume'
362
                ]
363
            );
364
    }
365
366
    /**
367
     * Handle shutdown - Usually in case "Allowed memory size of x bytes exhausted"
368
     */
369
    private function registerShutdownHandler()
370
    {
371
        $consumer = $this;
372
        register_shutdown_function(function () use ($consumer) {
373
            $consumer->stopConsuming();
374
        });
375
    }
376
377
    /**
378
     * Register signals
379
     */
380
    protected function handleKillSignals()
381
    {
382
        if (extension_loaded('pcntl')) {
383
            pcntl_signal(SIGTERM, [$this, 'catchKillSignal']);
384
            pcntl_signal(SIGINT, [$this, 'catchKillSignal']);
385
386
            if (function_exists('pcntl_signal_dispatch')) {
387
                // let the signal go forward
388
                pcntl_signal_dispatch();
389
            }
390
        }
391
    }
392
393
    /**
394
     * Handle Kill Signals
395
     * @param int $signalNumber
396
     */
397
    public function catchKillSignal(int $signalNumber)
398
    {
399
        $this->stopConsuming();
400
        $this->logger->debug(sprintf("Caught signal %d", $signalNumber));
401
    }
402
403
    /**
404
     * It is the tag that is listed in RabbitMQ UI as the consumer "name"
405
     *
406
     * @return string
407
     */
408
    private function getConsumerTag(): string
409
    {
410
        return sprintf("%s_%s_%s", $this->aliasName, gethostname(), getmypid());
411
    }
412
413
    /**
414
     * @return MessageProcessorInterface
415
     */
416 1
    private function getMessageProcessor(): MessageProcessorInterface
417
    {
418 1
        if (!($this->messageProcessor instanceof MessageProcessorInterface)) {
419
            $this->messageProcessor = app($this->messageProcessor);
420
            if ($this->messageProcessor instanceof AbstractMessageProcessor) {
421
                $this->messageProcessor->setLogger($this->logger);
422
            }
423
        }
424 1
        return $this->messageProcessor;
425
    }
426
427
    /**
428
     * @param AMQPMessage $message
429
     * @throws \Throwable
430
     */
431 1
    public function consume(AMQPMessage $message)
432
    {
433
        try {
434 1
            $this->getMessageProcessor()->consume($message);
435 1
            $this->logger->debug("Consumed message", [$message->getBody()]);
436
        } catch (\Throwable $e) {
437
            $this->logger->notice(
438
                sprintf(
439
                    "Got %s from %s in %d",
440
                    $e->getMessage(),
441
                    (string)$e->getFile(),
442
                    (int)$e->getLine()
443
                )
444
            );
445
            // let the exception slide, the processor should handle
446
            // exception, this is just a notice that should not
447
            // ever appear
448
            throw $e;
449
        }
450 1
    }
451
}
452