Completed
Push — master ( debbc0...855ad3 )
by Tilita
02:28
created

QueueEntity::__construct()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 6

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 6
ccs 5
cts 5
cp 1
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 3
crap 1
1
<?php
2
namespace NeedleProject\LaravelRabbitMq\Entity;
3
4
use NeedleProject\LaravelRabbitMq\AMQPConnection;
5
use NeedleProject\LaravelRabbitMq\ConsumerInterface;
6
use NeedleProject\LaravelRabbitMq\Processor\AbstractMessageProcessor;
7
use NeedleProject\LaravelRabbitMq\Processor\MessageProcessorInterface;
8
use NeedleProject\LaravelRabbitMq\PublisherInterface;
9
use PhpAmqpLib\Channel\AMQPChannel;
10
use PhpAmqpLib\Exception\AMQPTimeoutException;
11
use PhpAmqpLib\Message\AMQPMessage;
12
use Psr\Log\LoggerAwareInterface;
13
use Psr\Log\LoggerAwareTrait;
14
15
/**
16
 * Class QueueEntity
17
 *
18
 * @package NeedleProject\LaravelRabbitMq\Entity
19
 * @author  Adrian Tilita <[email protected]>
20
 */
21
class QueueEntity implements PublisherInterface, ConsumerInterface, LoggerAwareInterface
22
{
23
    use LoggerAwareTrait;
24
25
    /**
26
     * @const array Default connections parameters
27
     */
28
    const DEFAULTS = [
29
        'passive'   => false,
30
        'durable'   => false,
31
        'exclusive' => false,
32
        'auto_delete' => false,
33
        'internal'  => false,
34
        'nowait'    => false,
35
    ];
36
37
    /**
38
     * @var AMQPConnection
39
     */
40
    protected $connection;
41
42
    /**
43
     * @var string
44
     */
45
    protected $aliasName;
46
47
    /**
48
     * @var array
49
     */
50
    protected $attributes;
51
52
    /**
53
     * @var int
54
     */
55
    protected $prefetchCount = 1;
56
57
    /**
58
     * @var null|string|MessageProcessorInterface
59
     */
60
    protected $messageProcessor = null;
61
62
    /**
63
     * @var int
64
     */
65
    protected $limitMessageCount;
66
67
    /**
68
     * @var int
69
     */
70
    protected $limitSecondsUptime;
71
72
    /**
73
     * @var int
74
     */
75
    protected $limitMemoryConsumption;
76
77
    /**
78
     * @var double
79
     */
80
    protected $startTime = 0;
81
82
    /**
83
     * @param AMQPConnection $connection
84
     * @param string $aliasName
85
     * @param array $exchangeDetails
86
     * @return QueueEntity
87
     */
88 11
    public static function createQueue(AMQPConnection $connection, string $aliasName, array $exchangeDetails)
89
    {
90 11
        return new static(
91
            $connection,
92
            $aliasName,
93 11
            array_merge(self::DEFAULTS, $exchangeDetails)
94
        );
95
    }
96
97
    /**
98
     * @return string
99
     */
100 3
    public function getAliasName(): string
101
    {
102 3
        return $this->aliasName;
103
    }
104
105
    /**
106
     * ExchangeEntity constructor.
107
     *
108
     * @param AMQPConnection $connection
109
     * @param string $aliasName
110
     * @param array $attributes
111
     */
112 11
    public function __construct(AMQPConnection $connection, string $aliasName, array $attributes = [])
113
    {
114 11
        $this->connection = $connection;
115 11
        $this->aliasName  = $aliasName;
116 11
        $this->attributes = $attributes;
117 11
    }
118
119
    /**
120
     * @param int $prefetchCount
121
     * @return ConsumerInterface
122
     */
123 2
    public function setPrefetchCount(int $prefetchCount): ConsumerInterface
124
    {
125 2
        $this->prefetchCount = $prefetchCount;
126 2
        return $this;
127
    }
128
129
    /**
130
     * @param string $messageProcessor
131
     * @return ConsumerInterface
132
     */
133 2
    public function setMessageProcessor(string $messageProcessor): ConsumerInterface
134
    {
135 2
        $this->messageProcessor = $messageProcessor;
136 2
        return $this;
137
    }
138
139
    /**
140
     * @return AMQPConnection
141
     */
142 4
    protected function getConnection(): AMQPConnection
143
    {
144 4
        return $this->connection;
145
    }
146
147
    /**
148
     * @return AMQPChannel
149
     */
150 4
    protected function getChannel(): AMQPChannel
151
    {
152 4
        return $this->getConnection()->getChannel();
153
    }
154
155
    /**
156
     * Create the Queue
157
     */
158 1 View Code Duplication
    public function create()
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
159
    {
160 1
        $this->getChannel()
161 1
            ->queue_declare(
162 1
                $this->attributes['name'],
163 1
                $this->attributes['passive'],
164 1
                $this->attributes['durable'],
165 1
                $this->attributes['exclusive'],
166 1
                $this->attributes['auto_delete'],
167 1
                $this->attributes['internal'],
168 1
                $this->attributes['nowait']
169
            );
170 1
    }
171
172 1 View Code Duplication
    public function bind()
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
173
    {
174 1
        if (isset($this->attributes['bind'])) {
175 1
            foreach ($this->attributes['bind'] as $bindItem) {
176 1
                $this->getChannel()
177 1
                    ->queue_bind(
178 1
                        $this->attributes['name'],
179 1
                        $bindItem['exchange'],
180 1
                        $bindItem['routing_key']
181
                    );
182
            }
183
        }
184 1
    }
185
186
    /**
187
     * Delete the queue
188
     */
189 1
    public function delete()
190
    {
191 1
        $this->getChannel()->queue_delete($this->attributes['name']);
192 1
    }
193
194
195
    /**
196
     * Publish a message
197
     *
198
     * @param string $message
199
     * @param string $routingKey
200
     * @return void
201
     */
202 1
    public function publish(string $message, string $routingKey = '')
203
    {
204 1
        $this->getChannel()
205 1
            ->basic_publish(
206 1
                new AMQPMessage($message),
207 1
                '',
208 1
                $this->attributes['name'],
209 1
                true
210
            );
211 1
    }
212
213
    /**
214
     * {@inheritdoc}
215
     *
216
     * @param int $messages
217
     * @param int $seconds
218
     * @param int $maxMemory
219
     * @return int
220
     */
221
    public function startConsuming(int $messages, int $seconds, int $maxMemory)
222
    {
223
        $this->setupConsumer($messages, $seconds, $maxMemory);
224
        while (false === $this->shouldStopConsuming()) {
225
            try {
226
                $this->getChannel()->wait(null, false, 1);
227
            } catch (AMQPTimeoutException $e) {
228
                $this->logger->debug("Timeout exceeded, reconnecting!");
229
                usleep(1000);
230
                $this->getConnection()->reconnect();
231
                $this->setupChannelConsumer();
232
            } catch (\Throwable $e) {
233
                // stop the consumer
234
                $this->stopConsuming();
235
                $this->logger->notice(sprintf(
236
                    "Stopped consuming: %s in %s:%d",
237
                    get_class($e) . ' - ' . $e->getMessage(),
238
                    (string)$e->getFile(),
239
                    (int)$e->getLine()
240
                ));
241
                return 1;
242
            }
243
        }
244
        return 0;
245
    }
246
247
    /**
248
     * @return bool
249
     */
250
    protected function shouldStopConsuming(): bool
251
    {
252
        if ((microtime(true) - $this->startTime) > $this->limitSecondsUptime) {
253
            $this->logger->debug(
254
                "Stopped consumer",
255
                [
256
                    'limit' => 'time_limit',
257
                    'value' => sprintf("%.2f", microtime(true) - $this->startTime)
258
                ]
259
            );
260
            return true;
261
        }
262
        if (memory_get_peak_usage(true) >= ($this->limitMemoryConsumption * 1048576)) {
263
            $this->logger->debug(
264
                "Stopped consumer",
265
                [
266
                    'limit' => 'memory_limit',
267
                    'value' => (int)round(memory_get_peak_usage(true) / 1048576, 2)
268
                ]
269
            );
270
            return true;
271
        }
272
273
        if ($this->getMessageProcessor()->getProcessedMessages() >= $this->limitMessageCount) {
274
            $this->logger->debug(
275
                "Stopped consumer",
276
                ['limit' => 'message_count', 'value' => (int)$this->getMessageProcessor()->getProcessedMessages()]
277
            );
278
            return true;
279
        }
280
        return false;
281
    }
282
283
    /**
284
     * Stop the consumer
285
     */
286
    public function stopConsuming()
287
    {
288
        $this->logger->debug("Stopping consumer!");
289
        try {
290
            $this->getChannel()->basic_cancel($this->getConsumerTag(), false, true);
291
        } catch (\Throwable $e) {
292
            $this->logger->notice("Got " . $e->getMessage() . " of type " . get_class($e));
293
        }
294
    }
295
296
    /**
297
     * Setup the consumer
298
     *
299
     * @param int $messages
300
     * @param int $seconds
301
     * @param int $maxMemory
302
     */
303
    protected function setupConsumer(int $messages, int $seconds, int $maxMemory)
304
    {
305
        $this->limitMessageCount = $messages;
306
        $this->limitSecondsUptime = $seconds;
307
        $this->limitMemoryConsumption = $maxMemory;
308
309
        $this->startTime = microtime(true);
310
311
        $this->setupChannelConsumer();
312
313
        $this->registerShutdownHandler();
314
        $this->handleKillSignals();
315
    }
316
317
    private function setupChannelConsumer()
318
    {
319
        $this->getChannel()
320
            ->basic_qos(null, $this->prefetchCount, true);
321
322
        $this->getChannel()
323
            ->basic_consume(
324
                $this->attributes['name'],
325
                $this->getConsumerTag(),
326
                false,
327
                false,
328
                false,
329
                false,
330
                [
331
                    $this,
332
                    'consume'
333
                ]
334
            );
335
    }
336
337
    /**
338
     * Handle shutdown - Usually in case "Allowed memory size of x bytes exhausted"
339
     */
340
    private function registerShutdownHandler()
341
    {
342
        $consumer = $this;
343
        register_shutdown_function(function () use ($consumer) {
344
            $consumer->stopConsuming();
345
        });
346
    }
347
348
    /**
349
     * Register signals
350
     */
351
    private function handleKillSignals()
352
    {
353
        if (extension_loaded('pcntl')) {
354
            pcntl_signal(SIGTERM, [$this, 'catchKillSignal']);
355
            pcntl_signal(SIGKILL, [$this, 'catchKillSignal']);
356
            pcntl_signal(SIGSTOP, [$this, 'catchKillSignal']);
357
358
            if (function_exists('pcntl_signal_dispatch')) {
359
                // let the signal go forward
360
                pcntl_signal_dispatch();
361
            }
362
        }
363
    }
364
365
    /**
366
     * Handle Kill Signals
367
     * @param int $signalNumber
368
     */
369
    private function catchKillSignal(int $signalNumber)
370
    {
371
        $this->stopConsuming();
372
        $this->logger->debug(sprintf("Caught signal %d", $signalNumber));
373
    }
374
375
    /**
376
     * It is the tag that is listed in RabbitMQ UI as the consumer "name"
377
     *
378
     * @return string
379
     */
380
    private function getConsumerTag(): string
381
    {
382
        return sprintf("%s_%s_%s", $this->aliasName, gethostname(), getmypid());
383
    }
384
385
    /**
386
     * @return MessageProcessorInterface
387
     */
388
    private function getMessageProcessor(): MessageProcessorInterface
389
    {
390
        if (!($this->messageProcessor instanceof MessageProcessorInterface)) {
391
            $this->messageProcessor = app($this->messageProcessor);
392
            if ($this->messageProcessor instanceof AbstractMessageProcessor) {
393
                $this->messageProcessor->setLogger($this->logger);
394
            }
395
        }
396
        return $this->messageProcessor;
397
    }
398
399
    /**
400
     * @param AMQPMessage $message
401
     * @throws \Throwable
402
     */
403
    public function consume(AMQPMessage $message)
404
    {
405
        try {
406
            $this->getMessageProcessor()->consume($message);
407
            $this->logger->debug("Consumed message", [$message->getBody()]);
408
        } catch (\Throwable $e) {
409
            $this->logger->notice(
410
                sprintf(
411
                    "Got %s from %s in %d",
412
                    $e->getMessage(),
413
                    (string)$e->getFile(),
414
                    (int)$e->getLine()
415
                )
416
            );
417
            // let the exception slide, the processor should handle
418
            // exception, this is just a notice that should not
419
            // ever appear
420
            throw $e;
421
        }
422
    }
423
}
424