Passed
Push — master ( f63365...217445 )
by Tilita
01:10
created

QueueEntity::setupChannelConsumer()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 19

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 19
ccs 0
cts 14
cp 0
rs 9.6333
c 0
b 0
f 0
cc 1
nc 1
nop 0
crap 2
1
<?php
2
namespace NeedleProject\LaravelRabbitMq\Entity;
3
4
use NeedleProject\LaravelRabbitMq\AMQPConnection;
5
use NeedleProject\LaravelRabbitMq\ConsumerInterface;
6
use NeedleProject\LaravelRabbitMq\Processor\MessageProcessorInterface;
7
use NeedleProject\LaravelRabbitMq\PublisherInterface;
8
use PhpAmqpLib\Channel\AMQPChannel;
9
use PhpAmqpLib\Exception\AMQPTimeoutException;
10
use PhpAmqpLib\Message\AMQPMessage;
11
use Psr\Log\LoggerAwareInterface;
12
use Psr\Log\LoggerAwareTrait;
13
14
/**
15
 * Class QueueEntity
16
 *
17
 * @package NeedleProject\LaravelRabbitMq\Entity
18
 * @author  Adrian Tilita <[email protected]>
19
 */
20
class QueueEntity implements PublisherInterface, ConsumerInterface, LoggerAwareInterface
21
{
22
    use LoggerAwareTrait;
23
24
    /**
25
     * @const array Default connections parameters
26
     */
27
    const DEFAULTS = [
28
        'passive'   => false,
29
        'durable'   => false,
30
        'exclusive' => false,
31
        'auto_delete' => false,
32
        'internal'  => false,
33
        'nowait'    => false,
34
    ];
35
36
    /**
37
     * @var AMQPConnection
38
     */
39
    protected $connection;
40
41
    /**
42
     * @var string
43
     */
44
    protected $aliasName;
45
46
    /**
47
     * @var array
48
     */
49
    protected $attributes;
50
51
    /**
52
     * @var int
53
     */
54
    protected $prefetchCount = 1;
55
56
    /**
57
     * @var null|string|MessageProcessorInterface
58
     */
59
    protected $messageProcessor = null;
60
61
    /**
62
     * @var int
63
     */
64
    protected $limitMessageCount;
65
66
    /**
67
     * @var int
68
     */
69
    protected $limitSecondsUptime;
70
71
    /**
72
     * @var int
73
     */
74
    protected $limitMemoryConsumption;
75
76
    /**
77
     * @var double
78
     */
79
    protected $startTime = 0;
80
81
    /**
82
     * @param AMQPConnection $connection
83
     * @param string $aliasName
84
     * @param array $exchangeDetails
85
     * @return QueueEntity
86
     */
87 11
    public static function createQueue(AMQPConnection $connection, string $aliasName, array $exchangeDetails)
88
    {
89 11
        return new static(
90
            $connection,
91
            $aliasName,
92 11
            array_merge(self::DEFAULTS, $exchangeDetails)
93
        );
94
    }
95
96
    /**
97
     * @return string
98
     */
99 3
    public function getAliasName(): string
100
    {
101 3
        return $this->aliasName;
102
    }
103
104
    /**
105
     * ExchangeEntity constructor.
106
     *
107
     * @param AMQPConnection $connection
108
     * @param string $aliasName
109
     * @param array $attributes
110
     */
111 11
    public function __construct(AMQPConnection $connection, string $aliasName, array $attributes = [])
112
    {
113 11
        $this->connection = $connection;
114 11
        $this->aliasName  = $aliasName;
115 11
        $this->attributes = $attributes;
116 11
    }
117
118
    /**
119
     * @param int $prefetchCount
120
     * @return ConsumerInterface
121
     */
122 2
    public function setPrefetchCount(int $prefetchCount): ConsumerInterface
123
    {
124 2
        $this->prefetchCount = $prefetchCount;
125 2
        return $this;
126
    }
127
128
    /**
129
     * @param string $messageProcessor
130
     * @return ConsumerInterface
131
     */
132 2
    public function setMessageProcessor(string $messageProcessor): ConsumerInterface
133
    {
134 2
        $this->messageProcessor = $messageProcessor;
135 2
        return $this;
136
    }
137
138
    /**
139
     * @return AMQPConnection
140
     */
141 4
    protected function getConnection(): AMQPConnection
142
    {
143 4
        return $this->connection;
144
    }
145
146
    /**
147
     * @return AMQPChannel
148
     */
149 4
    protected function getChannel(): AMQPChannel
150
    {
151 4
        return $this->getConnection()->getChannel();
152
    }
153
154
    /**
155
     * Create the Queue
156
     */
157 1
    public function create()
158
    {
159 1
        $this->getChannel()
160 1
            ->queue_declare(
161 1
                $this->attributes['name'],
162 1
                $this->attributes['passive'],
163 1
                $this->attributes['durable'],
164 1
                $this->attributes['exclusive'],
165 1
                $this->attributes['auto_delete'],
166 1
                $this->attributes['internal'],
167 1
                $this->attributes['nowait']
168
            );
169 1
    }
170
171 1 View Code Duplication
    public function bind()
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
172
    {
173 1
        if (isset($this->attributes['bind'])) {
174 1
            foreach ($this->attributes['bind'] as $bindItem) {
175 1
                $this->getChannel()
176 1
                    ->queue_bind(
177 1
                        $this->attributes['name'],
178 1
                        $bindItem['exchange'],
179 1
                        $bindItem['routing_key']
180
                    );
181
            }
182
        }
183 1
    }
184
185
    /**
186
     * Delete the queue
187
     */
188 1
    public function delete()
189
    {
190 1
        $this->getChannel()->queue_delete($this->attributes['name']);
191 1
    }
192
193
194
    /**
195
     * Publish a message
196
     *
197
     * @param string $message
198
     * @param string $routingKey
199
     * @return void
200
     */
201 1
    public function publish(string $message, string $routingKey = '')
202
    {
203 1
        $this->getChannel()
204 1
            ->basic_publish(
205 1
                new AMQPMessage($message),
206 1
                '',
207 1
                $this->attributes['name'],
208 1
                true
209
            );
210 1
    }
211
212
    /**
213
     * {@inheritdoc}
214
     *
215
     * @param int $messages
216
     * @param int $seconds
217
     * @param int $maxMemory
218
     * @return int
219
     */
220
    public function startConsuming(int $messages, int $seconds, int $maxMemory)
221
    {
222
        $this->setupConsumer($messages, $seconds, $maxMemory);
223
        while (false === $this->shouldStopConsuming()) {
224
            try {
225
                $this->getChannel()->wait(null, false, 1);
226
            } catch (AMQPTimeoutException $e) {
227
                $this->logger->debug("Timeout exceeded, reconnecting!");
228
                usleep(1000);
229
                $this->getConnection()->reconnect();
230
                $this->setupChannelConsumer();
231
            } catch (\Throwable $e) {
232
                // stop the consumer
233
                $this->stopConsuming();
234
                $this->logger->critical(sprintf(
235
                    "Stopped consuming: %s in %s:%d",
236
                    $e->getMessage(),
237
                    (string)$e->getFile(),
238
                    (int)$e->getLine()
239
                ));
240
                return 1;
241
            }
242
        }
243
        return 0;
244
    }
245
246
    /**
247
     * @return bool
248
     */
249
    protected function shouldStopConsuming(): bool
250
    {
251
        if ((microtime(true) - $this->startTime) > $this->limitSecondsUptime) {
252
            $this->logger->debug(
253
                "Stopped consumer",
254
                [
255
                    'limit' => 'time_limit',
256
                    'value' => sprintf("%.2f", microtime(true) - $this->startTime)
257
                ]
258
            );
259
            return true;
260
        }
261
        if (memory_get_peak_usage(true) >= ($this->limitMemoryConsumption * 1048576)) {
262
            $this->logger->debug(
263
                "Stopped consumer",
264
                [
265
                    'limit' => 'memory_limit',
266
                    'value' => (int)round(memory_get_peak_usage(true) / 1048576, 2)
267
                ]
268
            );
269
            return true;
270
        }
271
272
        if ($this->getMessageProcessor()->getProcessedMessages() >= $this->limitMessageCount) {
273
            $this->logger->debug(
274
                "Stopped consumer",
275
                ['limit' => 'message_count', 'value' => (int)$this->getMessageProcessor()->getProcessedMessages()]
276
            );
277
            return true;
278
        }
279
        return false;
280
    }
281
282
    /**
283
     * Stop the consumer
284
     */
285
    public function stopConsuming()
286
    {
287
        $this->logger->debug("Stopping consumer!");
288
        $this->getChannel()->basic_cancel($this->getConsumerTag(), false, true);
289
    }
290
291
    /**
292
     * Setup the consumer
293
     *
294
     * @param int $messages
295
     * @param int $seconds
296
     * @param int $maxMemory
297
     */
298
    protected function setupConsumer(int $messages, int $seconds, int $maxMemory)
299
    {
300
        $this->limitMessageCount = $messages;
301
        $this->limitSecondsUptime = $seconds;
302
        $this->limitMemoryConsumption = $maxMemory;
303
304
        $this->startTime = microtime(true);
305
306
        $this->setupChannelConsumer();
307
308
        $this->registerShutdownHandler();
309
        $this->handleKillSignals();
310
    }
311
312
    private function setupChannelConsumer()
313
    {
314
        $this->getChannel()
315
            ->basic_qos(null, $this->prefetchCount, true);
316
317
        $this->getChannel()
318
            ->basic_consume(
319
                $this->attributes['name'],
320
                $this->getConsumerTag(),
321
                false,
322
                false,
323
                false,
324
                false,
325
                [
326
                    $this,
327
                    'consume'
328
                ]
329
            );
330
    }
331
332
    /**
333
     * Handle shutdown - Usually in case "Allowed memory size of x bytes exhausted"
334
     */
335
    private function registerShutdownHandler()
336
    {
337
        $consumer = $this;
338
        register_shutdown_function(function () use ($consumer) {
339
            $consumer->stopConsuming();
340
        });
341
    }
342
343
    /**
344
     * Register signals
345
     */
346
    private function handleKillSignals()
347
    {
348
        if (extension_loaded('pcntl')) {
349
            pcntl_signal(SIGTERM, [$this, 'catchKillSignal']);
350
            pcntl_signal(SIGKILL, [$this, 'catchKillSignal']);
351
            pcntl_signal(SIGSTOP, [$this, 'catchKillSignal']);
352
353
            if (function_exists('pcntl_signal_dispatch')) {
354
                // let the signal go forward
355
                pcntl_signal_dispatch();
356
            }
357
        }
358
    }
359
360
    /**
361
     * Handle Kill Signals
362
     * @param int $signalNumber
363
     */
364
    private function catchKillSignal(int $signalNumber)
365
    {
366
        $this->stopConsuming();
367
        $this->logger->debug(sprintf("Caught signal %d", $signalNumber));
368
    }
369
370
    /**
371
     * It is the tag that is listed in RabbitMQ UI as the consumer "name"
372
     *
373
     * @return string
374
     */
375
    private function getConsumerTag(): string
376
    {
377
        return sprintf("%s_%s_%s", $this->aliasName, gethostname(), getmypid());
378
    }
379
380
    /**
381
     * @return MessageProcessorInterface
382
     */
383
    private function getMessageProcessor(): MessageProcessorInterface
384
    {
385
        if (!($this->messageProcessor instanceof MessageProcessorInterface)) {
386
            $this->messageProcessor = app($this->messageProcessor);
387
        }
388
        return $this->messageProcessor;
389
    }
390
391
    /**
392
     * @param AMQPMessage $message
393
     * @throws \Throwable
394
     */
395
    public function consume(AMQPMessage $message)
396
    {
397
        $this->logger->debug("Consumed message", [$message->getBody()]);
398
        try {
399
            $this->getMessageProcessor()->consume($message);
400
        } catch (\Throwable $e) {
401
            $this->logger->notice(
402
                sprintf(
403
                    "Got %s from %s in %d",
404
                    $e->getMessage(),
405
                    (string)$e->getFile(),
406
                    (int)$e->getLine()
407
                )
408
            );
409
            // let the exception slide, the processor should handle
410
            // exception, this is just a notice that should not
411
            // ever appear
412
            throw $e;
413
        }
414
    }
415
}
416