Completed
Pull Request — master (#16)
by Tilita
02:51
created

QueueEntity::getChannel()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 4
ccs 2
cts 2
cp 1
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 0
crap 1
1
<?php
2
namespace NeedleProject\LaravelRabbitMq\Entity;
3
4
use NeedleProject\LaravelRabbitMq\AMQPConnection;
5
use NeedleProject\LaravelRabbitMq\ConsumerInterface;
6
use NeedleProject\LaravelRabbitMq\Processor\MessageProcessorInterface;
7
use NeedleProject\LaravelRabbitMq\PublisherInterface;
8
use PhpAmqpLib\Channel\AMQPChannel;
9
use PhpAmqpLib\Exception\AMQPTimeoutException;
10
use PhpAmqpLib\Message\AMQPMessage;
11
use Psr\Log\LoggerAwareInterface;
12
use Psr\Log\LoggerAwareTrait;
13
14
/**
15
 * Class QueueEntity
16
 *
17
 * @package NeedleProject\LaravelRabbitMq\Entity
18
 * @author  Adrian Tilita <[email protected]>
19
 */
20
class QueueEntity implements PublisherInterface, ConsumerInterface, LoggerAwareInterface
21
{
22
    use LoggerAwareTrait;
23
24
    /**
25
     * @const array Default connections parameters
26
     */
27
    const DEFAULTS = [
28
        'passive'   => false,
29
        'durable'   => false,
30
        'exclusive' => false,
31
        'auto_delete' => false,
32
        'internal'  => false,
33
        'nowait'    => false,
34
    ];
35
36
    /**
37
     * @var AMQPConnection
38
     */
39
    protected $connection;
40
41
    /**
42
     * @var string
43
     */
44
    protected $aliasName;
45
46
    /**
47
     * @var array
48
     */
49
    protected $attributes;
50
51
    /**
52
     * @var int
53
     */
54
    protected $prefetchCount = 1;
55
56
    /**
57
     * @var null|string|MessageProcessorInterface
58
     */
59
    protected $messageProcessor = null;
60
61
    /**
62
     * @var int
63
     */
64
    protected $limitMessageCount;
65
66
    /**
67
     * @var int
68
     */
69
    protected $limitSecondsUptime;
70
71
    /**
72
     * @var int
73
     */
74
    protected $limitMemoryConsumption;
75
76
    /**
77
     * @var double
78
     */
79
    protected $startTime = 0;
80
81
    /**
82
     * @param AMQPConnection $connection
83
     * @param string $aliasName
84
     * @param array $exchangeDetails
85
     * @return QueueEntity
86
     */
87 11
    public static function createQueue(AMQPConnection $connection, string $aliasName, array $exchangeDetails)
88
    {
89 11
        return new static(
90
            $connection,
91
            $aliasName,
92 11
            array_merge(self::DEFAULTS, $exchangeDetails)
93
        );
94
    }
95
96
    /**
97
     * @return string
98
     */
99 3
    public function getAliasName(): string
100
    {
101 3
        return $this->aliasName;
102
    }
103
104
    /**
105
     * ExchangeEntity constructor.
106
     *
107
     * @param AMQPConnection $connection
108
     * @param string $aliasName
109
     * @param array $attributes
110
     */
111 11
    public function __construct(AMQPConnection $connection, string $aliasName, array $attributes = [])
112
    {
113 11
        $this->connection = $connection;
114 11
        $this->aliasName  = $aliasName;
115 11
        $this->attributes = $attributes;
116 11
    }
117
118
    /**
119
     * @param int $prefetchCount
120
     * @return ConsumerInterface
121
     */
122 2
    public function setPrefetchCount(int $prefetchCount): ConsumerInterface
123
    {
124 2
        $this->prefetchCount = $prefetchCount;
125 2
        return $this;
126
    }
127
128
    /**
129
     * @param string $messageProcessor
130
     * @return ConsumerInterface
131
     */
132 2
    public function setMessageProcessor(string $messageProcessor): ConsumerInterface
133
    {
134 2
        $this->messageProcessor = $messageProcessor;
135 2
        return $this;
136
    }
137
138
    /**
139
     * @return AMQPConnection
140
     */
141 4
    protected function getConnection(): AMQPConnection
142
    {
143 4
        return $this->connection;
144
    }
145
146
    /**
147
     * @return AMQPChannel
148
     */
149 4
    protected function getChannel(): AMQPChannel
150
    {
151 4
        return $this->getConnection()->getChannel();
152
    }
153
154
    /**
155
     * Create the Queue
156
     */
157 1
    public function create()
158
    {
159 1
        $this->getChannel()
160 1
            ->queue_declare(
161 1
                $this->attributes['name'],
162 1
                $this->attributes['passive'],
163 1
                $this->attributes['durable'],
164 1
                $this->attributes['exclusive'],
165 1
                $this->attributes['auto_delete'],
166 1
                $this->attributes['internal'],
167 1
                $this->attributes['nowait']
168
            );
169 1
    }
170
171 1 View Code Duplication
    public function bind()
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
172
    {
173 1
        if (isset($this->attributes['bind'])) {
174 1
            foreach ($this->attributes['bind'] as $bindItem) {
175 1
                $this->getChannel()
176 1
                    ->queue_bind(
177 1
                        $this->attributes['name'],
178 1
                        $bindItem['exchange'],
179 1
                        $bindItem['routing_key']
180
                    );
181
            }
182
        }
183 1
    }
184
185
    /**
186
     * Delete the queue
187
     */
188 1
    public function delete()
189
    {
190 1
        $this->getChannel()->queue_delete($this->attributes['name']);
191 1
    }
192
193
194
    /**
195
     * Publish a message
196
     *
197
     * @param string $message
198
     * @param string $routingKey
199
     * @return void
200
     */
201 1
    public function publish(string $message, string $routingKey = '')
202
    {
203 1
        $this->getChannel()
204 1
            ->basic_publish(
205 1
                new AMQPMessage($message),
206 1
                '',
207 1
                $this->attributes['name'],
208 1
                true
209
            );
210 1
    }
211
212
    /**
213
     * {@inheritdoc}
214
     *
215
     * @param int $messages
216
     * @param int $seconds
217
     * @param int $maxMemory
218
     * @return int
219
     */
220
    public function startConsuming(int $messages, int $seconds, int $maxMemory)
221
    {
222
        $this->setupConsumer($messages, $seconds, $maxMemory);
223
        while (false === $this->shouldStopConsuming()) {
224
            try {
225
                $this->getChannel()->wait(null, false, 1);
226
            } catch (AMQPTimeoutException $e) {
227
228
                $this->logger->debug("Timeout exceeded, reconnecting!");
229
                usleep(1000);
230
                $this->getConnection()->reconnect();
231
232
                $this->setupChannelConsumer();
233
            } catch (\Throwable $e) {
234
                // stop the consumer
235
                $this->stopConsuming();
236
                $this->logger->critical(sprintf(
237
                    "Stopped consuming: %s in %s:%d",
238
                    $e->getMessage(),
239
                    (string)$e->getFile(),
240
                    (int)$e->getLine()
241
                ));
242
                return 1;
243
            }
244
        }
245
        return 0;
246
    }
247
248
    /**
249
     * @return bool
250
     */
251
    protected function shouldStopConsuming(): bool
252
    {
253
        if ((microtime(true) - $this->startTime) > $this->limitSecondsUptime) {
254
            $this->logger->debug(
255
                "Stopped consumer",
256
                [
257
                    'limit' => 'time_limit',
258
                    'value' => sprintf("%.2f", microtime(true) - $this->startTime)
259
                ]
260
            );
261
            return true;
262
        }
263
        if (memory_get_peak_usage(true) >= ($this->limitMemoryConsumption * 1048576)) {
264
            $this->logger->debug(
265
                "Stopped consumer",
266
                [
267
                    'limit' => 'memory_limit',
268
                    'value' => (int)round(memory_get_peak_usage(true) / 1048576, 2)
269
                ]
270
            );
271
            return true;
272
        }
273
274
        if ($this->getMessageProcessor()->getProcessedMessages() >= $this->limitMessageCount) {
275
            $this->logger->debug(
276
                "Stopped consumer",
277
                ['limit' => 'message_count', 'value' => (int)$this->getMessageProcessor()->getProcessedMessages()]
278
            );
279
            return true;
280
        }
281
        return false;
282
    }
283
284
    /**
285
     * Stop the consumer
286
     */
287
    public function stopConsuming()
288
    {
289
        $this->logger->debug("Stopping consumer!");
290
        $this->getChannel()->basic_cancel($this->getConsumerTag(), false, true);
291
    }
292
293
    /**
294
     * Setup the consumer
295
     *
296
     * @param int $messages
297
     * @param int $seconds
298
     * @param int $maxMemory
299
     */
300
    protected function setupConsumer(int $messages, int $seconds, int $maxMemory)
301
    {
302
        $this->limitMessageCount = $messages;
303
        $this->limitSecondsUptime = $seconds;
304
        $this->limitMemoryConsumption = $maxMemory;
305
306
        $this->startTime = microtime(true);
307
308
        $this->setupChannelConsumer();
309
310
        $this->registerShutdownHandler();
311
        $this->handleKillSignals();
312
    }
313
314
    private function setupChannelConsumer()
315
    {
316
        $this->getChannel()
317
            ->basic_qos(null, $this->prefetchCount, true);
318
319
        $this->getChannel()
320
            ->basic_consume(
321
                $this->attributes['name'],
322
                $this->getConsumerTag(),
323
                false,
324
                false,
325
                false,
326
                false,
327
                [
328
                    $this,
329
                    'consume'
330
                ]
331
            );
332
    }
333
334
    /**
335
     * Handle shutdown - Usually in case "Allowed memory size of x bytes exhausted"
336
     */
337
    private function registerShutdownHandler()
338
    {
339
        $consumer = $this;
340
        register_shutdown_function(function () use ($consumer) {
341
            $consumer->stopConsuming();
342
        });
343
    }
344
345
    /**
346
     * Register signals
347
     */
348
    private function handleKillSignals()
349
    {
350
        if (extension_loaded('pcntl')) {
351
            pcntl_signal(SIGTERM, [$this, 'catchKillSignal']);
352
            pcntl_signal(SIGKILL, [$this, 'catchKillSignal']);
353
            pcntl_signal(SIGSTOP, [$this, 'catchKillSignal']);
354
355
            if (function_exists('pcntl_signal_dispatch')) {
356
                // let the signal go forward
357
                pcntl_signal_dispatch();
358
            }
359
        }
360
    }
361
362
    /**
363
     * Handle Kill Signals
364
     * @param int $signalNumber
365
     */
366
    private function catchKillSignal(int $signalNumber)
367
    {
368
        $this->stopConsuming();
369
        $this->logger->debug(sprintf("Caught signal %d", $signalNumber));
370
    }
371
372
    /**
373
     * It is the tag that is listed in RabbitMQ UI as the consumer "name"
374
     *
375
     * @return string
376
     */
377
    private function getConsumerTag(): string
378
    {
379
        return sprintf("%s_%s_%s", $this->aliasName, gethostname(), getmypid());
380
    }
381
382
    /**
383
     * @return MessageProcessorInterface
384
     */
385
    private function getMessageProcessor(): MessageProcessorInterface
386
    {
387
        if (!($this->messageProcessor instanceof MessageProcessorInterface)) {
388
            $this->messageProcessor = app($this->messageProcessor);
389
        }
390
        return $this->messageProcessor;
391
    }
392
393
    /**
394
     * @param AMQPMessage $message
395
     * @throws \Throwable
396
     */
397
    public function consume(AMQPMessage $message)
398
    {
399
        $this->logger->debug("Consumed message", [$message->getBody()]);
400
        try {
401
            $this->getMessageProcessor()->consume($message);
402
        } catch (\Throwable $e) {
403
            $this->logger->notice(
404
                sprintf(
405
                    "Got %s from %s in %d",
406
                    $e->getMessage(),
407
                    (string)$e->getFile(),
408
                    (int)$e->getLine()
409
                )
410
            );
411
            // let the exception slide, the processor should handle
412
            // exception, this is just a notice that should not
413
            // ever appear
414
            throw $e;
415
        }
416
    }
417
}
418