Completed
Push — master ( 9186fa...c4ad81 )
by Tilita
16s queued 14s
created

QueueEntity::setPrefetchCount()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
cc 1
eloc 2
nc 1
nop 1
dl 0
loc 4
ccs 0
cts 0
cp 0
crap 2
rs 10
c 0
b 0
f 0
1
<?php
2
3
namespace NeedleProject\LaravelRabbitMq\Entity;
4
5
use NeedleProject\LaravelRabbitMq\AMQPConnection;
6
use NeedleProject\LaravelRabbitMq\ConsumerInterface;
7
use NeedleProject\LaravelRabbitMq\Processor\AbstractMessageProcessor;
8
use NeedleProject\LaravelRabbitMq\Processor\MessageProcessorInterface;
9
use NeedleProject\LaravelRabbitMq\PublisherInterface;
10
use PhpAmqpLib\Channel\AMQPChannel;
11
use PhpAmqpLib\Exception\AMQPProtocolChannelException;
12
use PhpAmqpLib\Exception\AMQPTimeoutException;
13
use PhpAmqpLib\Message\AMQPMessage;
14
use Psr\Log\LoggerAwareInterface;
15
use Psr\Log\LoggerAwareTrait;
16
use PhpAmqpLib\Exception\AMQPChannelClosedException;
17
18
/**
19
 * Class QueueEntity
20
 *
21
 * @package NeedleProject\LaravelRabbitMq\Entity
22
 * @author  Adrian Tilita <[email protected]>
23
 */
24
class QueueEntity implements PublisherInterface, ConsumerInterface, AMQPEntityInterface, LoggerAwareInterface
25
{
26
    use LoggerAwareTrait;
27
28
    /**
29
     * @const int   Retry count when a Channel Closed exeption is thrown
30
     */
31
    const MAX_RETRIES = 3;
32
33
    /**
34
     * @const array Default connections parameters
35
     */
36
    const DEFAULTS = [
37
        // Whether to check if it exists or to verify existance using argument types (Throws PRECONDITION_FAILED)
38
        'passive'                      => false,
39
        // Entities with durable will be re-created uppon server restart
40
        'durable'                      => false,
41
        // whether to use it by only one channel, then it gets deleted
42
        'exclusive'                    => false,
43
        // Whether to delete it when the queue has no event on it
44
        'auto_delete'                  => false,
45
        // Whether to receive a Declare confirmation
46
        'nowait'                       => false,
47
        // Additional arguments for queue creation
48
        'arguments'                    => [],
49
        // Whether to auto create the entity before publishing/consuming it
50
        'auto_create'                  => false,
51
        // whether to "hide" the exception on re-declare.
52
        // if the `passive` filter is set true, this is redundant
53
        'throw_exception_on_redeclare' => true,
54
        // whether to throw on exception when trying to
55
        // bind to an in-existent queue/exchange
56
        'throw_exception_on_bind_fail' => true,
57
    ];
58
59
    /**
60
     * @var AMQPConnection
61
     */
62
    protected $connection;
63
64
    /**
65
     * @var string
66
     */
67
    protected $aliasName;
68
69
    /**
70
     * @var array
71
     */
72
    protected $attributes;
73
74
    /**
75
     * @var int
76
     */
77
    protected $prefetchCount = 1;
78
79
    /**
80
     * @var null|string|MessageProcessorInterface
81
     */
82
    protected $messageProcessor = null;
83
84
    /**
85
     * @var int
86
     */
87
    protected $limitMessageCount;
88
89
    /**
90
     * @var int
91
     */
92
    protected $limitSecondsUptime;
93
94
    /**
95
     * @var int
96
     */
97
    protected $limitMemoryConsumption;
98
99
    /**
100
     * @var double
101
     */
102
    protected $startTime = 0;
103
104
    /**
105
     * @var int
106
     */
107
    protected $retryCount = 0;
108
    /**
109
     * @var bool
110
     */
111
    protected $globalPrefetch = true;
112
113
    /**
114 19
     * @param AMQPConnection $connection
115
     * @param string $aliasName
116 19
     * @param array $queueDetails
117
     * @return QueueEntity
118
     */
119 19
    public static function createQueue(AMQPConnection $connection, string $aliasName, array $queueDetails)
120
    {
121
        return new static(
122
            $connection,
123
            $aliasName,
124
            array_merge(self::DEFAULTS, $queueDetails)
125
        );
126 3
    }
127
128 3
    /**
129
     * @return string
130
     */
131
    public function getAliasName(): string
132
    {
133
        return $this->aliasName;
134
    }
135
136
    /**
137
     * ExchangeEntity constructor.
138 19
     *
139
     * @param AMQPConnection $connection
140 19
     * @param string $aliasName
141 19
     * @param array $attributes
142 19
     */
143 19
    public function __construct(AMQPConnection $connection, string $aliasName, array $attributes = [])
144
    {
145
        $this->connection = $connection;
146
        $this->aliasName  = $aliasName;
147
        $this->attributes = $attributes;
148
    }
149 2
150
    /**
151 2
     * @param int $prefetchCount
152 2
     * @return ConsumerInterface
153
     */
154
    public function setPrefetchCount(int $prefetchCount): ConsumerInterface
155
    {
156
        $this->prefetchCount = $prefetchCount;
157
        return $this;
158
    }
159 2
160
    /**
161 2
     * @param string $messageProcessor
162 2
     * @return ConsumerInterface
163
     */
164
    public function setMessageProcessor(string $messageProcessor): ConsumerInterface
165
    {
166
        $this->messageProcessor = $messageProcessor;
167
        return $this;
168 10
    }
169
170 10
    /**
171
     * @param bool $globalPrefetch
172
     * @return ConsumerInterface
173
     */
174
    public function setGlobalPrefetch(bool $globalPrefetch): ConsumerInterface
175
    {
176 10
        $this->globalPrefetch = $globalPrefetch;
177
178 10
        return $this;
179
    }
180
181
    /**
182
     * @return AMQPConnection
183
     */
184 4
    protected function getConnection(): AMQPConnection
185
    {
186
        return $this->connection;
187 4
    }
188 4
189 4
    /**
190 4
     * @return AMQPChannel
191 4
     */
192 4
    protected function getChannel(): AMQPChannel
193 4
    {
194 4
        return $this->getConnection()->getChannel();
195 4
    }
196
197 2
    /**
198
     * Create the Queue
199 2
     */
200 1
    public function create()
201
    {
202
        try {
203 1
            $this->getChannel()
204
                ->queue_declare(
205 3
                    $this->attributes['name'],
206
                    $this->attributes['passive'],
207 4
                    $this->attributes['durable'],
208
                    $this->attributes['exclusive'],
209 4
                    $this->attributes['auto_delete'],
210 1
                    $this->attributes['nowait'],
211
                    $this->attributes['arguments']
212 3
                );
213
        } catch (AMQPProtocolChannelException $e) {
214 3
            // 406 is a soft error triggered for precondition failure (when redeclaring with different parameters)
215 3
            if (true === $this->attributes['throw_exception_on_redeclare'] || $e->amqp_reply_code !== 406) {
216 3
                throw $e;
217 3
            }
218 3
            // a failure trigger channels closing process
219
            $this->reconnect();
220 1
        }
221
    }
222 1
223 1
    public function bind()
224
    {
225 2
        if (!isset($this->attributes['bind']) || empty($this->attributes['bind'])) {
226
            return;
227
        }
228 2
        foreach ($this->attributes['bind'] as $bindItem) {
229
            try {
230
                $this->getChannel()
231
                    ->queue_bind(
232
                        $this->attributes['name'],
233 1
                        $bindItem['exchange'],
234
                        $bindItem['routing_key']
235 1
                    );
236 1
            } catch (AMQPProtocolChannelException $e) {
237
                // 404 is the code for trying to bind to an non-existing entity
238
                if (true === $this->attributes['throw_exception_on_bind_fail'] || $e->amqp_reply_code !== 404) {
239
                    throw $e;
240
                }
241 1
                $this->reconnect();
242
            }
243 1
        }
244 1
    }
245
246
    /**
247
     * Delete the queue
248
     */
249
    public function delete()
250
    {
251
        $this->getChannel()->queue_delete($this->attributes['name']);
252
    }
253
254 4
    /**
255
     * {@inheritdoc}
256 4
     */
257 1
    public function reconnect()
258 1
    {
259
        $this->getConnection()->reconnect();
260
    }
261
262 4
    /**
263 4
     * Publish a message
264 4
     *
265 4
     * @param string $message
266 4
     * @param string $routingKey
267 4
     * @return mixed|void
268
     * @throws AMQPProtocolChannelException
269 3
     */
270 2
    public function publish(string $message, string $routingKey = '')
271 2
    {
272
        if ($this->attributes['auto_create'] === true) {
273 2
            $this->create();
274 2
            $this->bind();
275 2
        }
276 1
277
        try {
278 1
            $this->getChannel()
279
                ->basic_publish(
280 3
                    new AMQPMessage($message),
281
                    '',
282
                    $this->attributes['name'],
283
                    true
284
                );
285
            $this->retryCount = 0;
286
        } catch (AMQPChannelClosedException $exception) {
287
            $this->retryCount++;
288
            // Retry publishing with re-connect
289
            if ($this->retryCount < self::MAX_RETRIES) {
290
                $this->getConnection()->reconnect();
291
                $this->publish($message, $routingKey);
292
293
                return;
294
            }
295
            throw $exception;
296
        }
297
    }
298
299
    /**
300
     * {@inheritdoc}
301
     *
302
     * @param int $messages
303
     * @param int $seconds
304
     * @param int $maxMemory
305
     * @return int
306
     */
307
    public function startConsuming(int $messages, int $seconds, int $maxMemory)
308
    {
309
        $this->setupConsumer($messages, $seconds, $maxMemory);
310
        while (false === $this->shouldStopConsuming()) {
311
            try {
312
                $this->getChannel()->wait(null, false, 1);
313
            } catch (AMQPTimeoutException $e) {
314
                usleep(1000);
315
                $this->getConnection()->reconnect();
316
                $this->setupChannelConsumer();
317
            } catch (\Throwable $e) {
318
                // stop the consumer
319
                $this->stopConsuming();
320
                $this->logger->notice(sprintf(
0 ignored issues
show
Bug introduced by
The method notice() does not exist on null. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

320
                $this->logger->/** @scrutinizer ignore-call */ 
321
                               notice(sprintf(

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
321
                    "Stopped consuming: %s in %s:%d",
322
                    get_class($e) . ' - ' . $e->getMessage(),
323
                    (string)$e->getFile(),
324
                    (int)$e->getLine()
325
                ));
326
                return 1;
327
            }
328
        }
329
        return 0;
330
    }
331
332
    /**
333
     * @return bool
334
     */
335
    protected function shouldStopConsuming(): bool
336
    {
337
        if ((microtime(true) - $this->startTime) > $this->limitSecondsUptime) {
338
            $this->logger->debug(
339
                "Stopped consumer",
340
                [
341
                    'limit' => 'time_limit',
342
                    'value' => sprintf("%.2f", microtime(true) - $this->startTime)
343
                ]
344
            );
345
            return true;
346
        }
347
        if (memory_get_peak_usage(true) >= ($this->limitMemoryConsumption * 1048576)) {
348
            $this->logger->debug(
349
                "Stopped consumer",
350
                [
351
                    'limit' => 'memory_limit',
352
                    'value' => (int)round(memory_get_peak_usage(true) / 1048576, 2)
353
                ]
354
            );
355
            return true;
356
        }
357
358
        if ($this->getMessageProcessor()->getProcessedMessages() >= $this->limitMessageCount) {
359
            $this->logger->debug(
360
                "Stopped consumer",
361
                ['limit' => 'message_count', 'value' => (int)$this->getMessageProcessor()->getProcessedMessages()]
362
            );
363
            return true;
364
        }
365
        return false;
366
    }
367
368
    /**
369
     * Stop the consumer
370
     */
371
    public function stopConsuming()
372
    {
373
        try {
374
            $this->getChannel()->basic_cancel($this->getConsumerTag(), false, true);
375
        } catch (\Throwable $e) {
376
            $this->logger->notice("Got " . $e->getMessage() . " of type " . get_class($e));
377
        }
378
    }
379
380
    /**
381
     * Setup the consumer
382
     *
383
     * @param int $messages
384
     * @param int $seconds
385
     * @param int $maxMemory
386
     */
387
    protected function setupConsumer(int $messages, int $seconds, int $maxMemory)
388
    {
389
        $this->limitMessageCount = $messages;
390
        $this->limitSecondsUptime = $seconds;
391
        $this->limitMemoryConsumption = $maxMemory;
392
393
        $this->startTime = microtime(true);
0 ignored issues
show
Documentation Bug introduced by
It seems like microtime(true) can also be of type string. However, the property $startTime is declared as type double. Maybe add an additional type check?

Our type inference engine has found a suspicous assignment of a value to a property. This check raises an issue when a value that can be of a mixed type is assigned to a property that is type hinted more strictly.

For example, imagine you have a variable $accountId that can either hold an Id object or false (if there is no account id yet). Your code now assigns that value to the id property of an instance of the Account class. This class holds a proper account, so the id value must no longer be false.

Either this assignment is in error or a type check should be added for that assignment.

class Id
{
    public $id;

    public function __construct($id)
    {
        $this->id = $id;
    }

}

class Account
{
    /** @var  Id $id */
    public $id;
}

$account_id = false;

if (starsAreRight()) {
    $account_id = new Id(42);
}

$account = new Account();
if ($account instanceof Id)
{
    $account->id = $account_id;
}
Loading history...
394
395
        $this->setupChannelConsumer();
396
397
        $this->registerShutdownHandler();
398
        $this->handleKillSignals();
399
    }
400
401
    private function setupChannelConsumer()
402
    {
403
        if ($this->attributes['auto_create'] === true) {
404
            $this->create();
405
            $this->bind();
406
        }
407
408
        $this->getChannel()
409
             ->basic_qos(null, $this->prefetchCount, $this->globalPrefetch);
410
411
        $this->getChannel()
412
            ->basic_consume(
413
                $this->attributes['name'],
414
                $this->getConsumerTag(),
415
                false,
416
                false,
417
                false,
418
                false,
419
                [
420
                    $this,
421
                    'consume'
422
                ]
423
            );
424
    }
425
426
    /**
427
     * Handle shutdown - Usually in case "Allowed memory size of x bytes exhausted"
428
     */
429
    private function registerShutdownHandler()
430
    {
431
        $consumer = $this;
432
        register_shutdown_function(function () use ($consumer) {
433
            $consumer->stopConsuming();
434
        });
435
    }
436
437
    /**
438
     * Register signals
439
     */
440
    protected function handleKillSignals()
441
    {
442
        if (extension_loaded('pcntl')) {
443
            pcntl_signal(SIGTERM, [$this, 'catchKillSignal']);
444
            pcntl_signal(SIGINT, [$this, 'catchKillSignal']);
445
446
            if (function_exists('pcntl_signal_dispatch')) {
447
                // let the signal go forward
448
                pcntl_signal_dispatch();
449
            }
450
        }
451
    }
452
453
    /**
454
     * Handle Kill Signals
455
     * @param int $signalNumber
456
     */
457
    public function catchKillSignal(int $signalNumber)
458
    {
459 1
        $this->stopConsuming();
460
        $this->logger->debug(sprintf("Caught signal %d", $signalNumber));
461 1
    }
462
463
    /**
464
     * It is the tag that is listed in RabbitMQ UI as the consumer "name"
465
     *
466
     * @return string
467 1
     */
468
    private function getConsumerTag(): string
469
    {
470
        return sprintf("%s_%s_%s", $this->aliasName, gethostname(), getmypid());
471
    }
472
473
    /**
474 1
     * @return MessageProcessorInterface
475
     */
476
    private function getMessageProcessor(): MessageProcessorInterface
477 1
    {
478 1
        if (!($this->messageProcessor instanceof MessageProcessorInterface)) {
479
            $this->messageProcessor = app($this->messageProcessor);
0 ignored issues
show
Documentation Bug introduced by
It seems like app($this->messageProcessor) can also be of type Illuminate\Contracts\Foundation\Application. However, the property $messageProcessor is declared as type NeedleProject\LaravelRab...orInterface|null|string. Maybe add an additional type check?

Our type inference engine has found a suspicous assignment of a value to a property. This check raises an issue when a value that can be of a mixed type is assigned to a property that is type hinted more strictly.

For example, imagine you have a variable $accountId that can either hold an Id object or false (if there is no account id yet). Your code now assigns that value to the id property of an instance of the Account class. This class holds a proper account, so the id value must no longer be false.

Either this assignment is in error or a type check should be added for that assignment.

class Id
{
    public $id;

    public function __construct($id)
    {
        $this->id = $id;
    }

}

class Account
{
    /** @var  Id $id */
    public $id;
}

$account_id = false;

if (starsAreRight()) {
    $account_id = new Id(42);
}

$account = new Account();
if ($account instanceof Id)
{
    $account->id = $account_id;
}
Loading history...
480
            if ($this->messageProcessor instanceof AbstractMessageProcessor) {
481
                $this->messageProcessor->setLogger($this->logger);
0 ignored issues
show
Bug introduced by
It seems like $this->logger can also be of type null; however, parameter $logger of NeedleProject\LaravelRab...eProcessor::setLogger() does only seem to accept Psr\Log\LoggerInterface, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

481
                $this->messageProcessor->setLogger(/** @scrutinizer ignore-type */ $this->logger);
Loading history...
482
            }
483
        }
484
        return $this->messageProcessor;
485
    }
486
487
    /**
488
     * @param AMQPMessage $message
489
     * @throws \Throwable
490
     */
491
    public function consume(AMQPMessage $message)
492
    {
493 1
        try {
494
            $this->getMessageProcessor()->consume($message);
495
            $this->logger->debug("Consumed message", ['message' => $message->getBody()]);
496
        } catch (\Throwable $e) {
497
            $this->logger->notice(
498
                sprintf(
499
                    "Got %s from %s in %d",
500
                    $e->getMessage(),
501
                    (string)$e->getFile(),
502
                    (int)$e->getLine()
503
                )
504
            );
505
            // let the exception slide, the processor should handle
506
            // exception, this is just a notice that should not
507
            // ever appear
508
            throw $e;
509
        }
510
    }
511
}
512