Issues (10)

src/Entity/QueueEntity.php (2 issues)

1
<?php
2
/**
3
 * Author: Joker
4
 * Date: 2020-05-08 13:57
5
 */
6
7
namespace JokerProject\LaravelAliyunAmqp\Entity;
8
9
use JokerProject\LaravelAliyunAmqp\AMQPConnection;
10
use JokerProject\LaravelAliyunAmqp\ConsumerInterface;
11
use JokerProject\LaravelAliyunAmqp\Processor\AbstractMessageProcessor;
12
use JokerProject\LaravelAliyunAmqp\Processor\MessageProcessorInterface;
13
use JokerProject\LaravelAliyunAmqp\PublisherInterface;
14
use PhpAmqpLib\Channel\AMQPChannel;
15
use PhpAmqpLib\Exception\AMQPProtocolChannelException;
16
use PhpAmqpLib\Exception\AMQPTimeoutException;
17
use PhpAmqpLib\Message\AMQPMessage;
18
use Psr\Log\LoggerAwareInterface;
19
use Psr\Log\LoggerAwareTrait;
20
use PhpAmqpLib\Exception\AMQPChannelClosedException;
21
22
/**
23
 * Class QueueEntity
24
 *
25
 * @package JokerProject\LaravelAliyunAmqp\Entity
26
 */
27
class QueueEntity implements PublisherInterface, ConsumerInterface, AMQPEntityInterface, LoggerAwareInterface
28
{
29
    use LoggerAwareTrait;
30
31
    /**
32
     * @const int   Retry count when a Channel Closed exeption is thrown
33
     */
34
    const MAX_RETRIES = 3;
35
36
    /**
37
     * @const array Default connections parameters
38
     */
39
    const DEFAULTS = [
40
        // Whether to check if it exists or to verify existance using argument types (Throws PRECONDITION_FAILED)
41
        'passive'                      => false,
42
        // Entities with durable will be re-created uppon server restart
43
        'durable'                      => false,
44
        // whether to use it by only one channel, then it gets deleted
45
        'exclusive'                    => false,
46
        // Whether to delete it when the queue has no event on it
47
        'auto_delete'                  => false,
48
        // Whether the exchange can be used by a publisher or block it (declared just for internal "wiring")
49
        'internal'                     => false,
50
        // Whether to receive a Declare confirmation
51
        'nowait'                       => false,
52
        // Whether to auto create the entity before publishing/consuming it
53
        'auto_create'                  => false,
54
        // whether to "hide" the exception on re-declare.
55
        // if the `passive` filter is set true, this is redundant
56
        'throw_exception_on_redeclare' => true,
57
        // whether to throw on exception when trying to
58
        // bind to an in-existent queue/exchange
59
        'throw_exception_on_bind_fail' => true,
60
    ];
61
62
    /**
63
     * @var AMQPConnection
64
     */
65
    protected $connection;
66
67
    /**
68
     * @var string
69
     */
70
    protected $aliasName;
71
72
    /**
73
     * @var array
74
     */
75
    protected $attributes;
76
77
    /**
78
     * @var int
79
     */
80
    protected $prefetchCount = 1;
81
82
    /**
83
     * @var null|string|MessageProcessorInterface
84
     */
85
    protected $messageProcessor = null;
86
87
    /**
88
     * @var int
89
     */
90
    protected $limitMessageCount;
91
92
    /**
93
     * @var int
94
     */
95
    protected $limitSecondsUptime;
96
97
    /**
98
     * @var int
99
     */
100
    protected $limitMemoryConsumption;
101
102
    /**
103
     * @var double
104
     */
105
    protected $startTime = 0;
106
107
    /**
108
     * @var int
109
     */
110
    protected $retryCount = 0;
111
112
    /**
113
     * @param AMQPConnection $connection
114
     * @param string $aliasName
115
     * @param array $exchangeDetails
116
     * @return QueueEntity
117
     */
118
    public static function createQueue(AMQPConnection $connection, string $aliasName, array $exchangeDetails)
119
    {
120
        return new static(
121
            $connection,
122
            $aliasName,
123
            array_merge(self::DEFAULTS, $exchangeDetails)
124
        );
125
    }
126
127
    /**
128
     * @return string
129
     */
130
    public function getAliasName(): string
131
    {
132
        return $this->aliasName;
133
    }
134
135
    /**
136
     * ExchangeEntity constructor.
137
     *
138
     * @param AMQPConnection $connection
139
     * @param string $aliasName
140
     * @param array $attributes
141
     */
142
    public function __construct(AMQPConnection $connection, string $aliasName, array $attributes = [])
143
    {
144
        $this->connection = $connection;
145
        $this->aliasName  = $aliasName;
146
        $this->attributes = $attributes;
147
    }
148
149
    /**
150
     * @param int $prefetchCount
151
     * @return ConsumerInterface
152
     */
153
    public function setPrefetchCount(int $prefetchCount): ConsumerInterface
154
    {
155
        $this->prefetchCount = $prefetchCount;
156
        return $this;
157
    }
158
159
    /**
160
     * @param string $messageProcessor
161
     * @return ConsumerInterface
162
     */
163
    public function setMessageProcessor(string $messageProcessor): ConsumerInterface
164
    {
165
        $this->messageProcessor = $messageProcessor;
166
        return $this;
167
    }
168
169
    /**
170
     * @return AMQPConnection
171
     */
172
    protected function getConnection(): AMQPConnection
173
    {
174
        return $this->connection;
175
    }
176
177
    /**
178
     * @return AMQPChannel
179
     */
180
    protected function getChannel(): AMQPChannel
181
    {
182
        return $this->getConnection()->getChannel();
183
    }
184
185
    /**
186
     * Create the Queue
187
     */
188
    public function create()
189
    {
190
        try {
191
            $this->getChannel()
192
                ->queue_declare(
193
                    $this->attributes['name'],
194
                    $this->attributes['passive'],
195
                    $this->attributes['durable'],
196
                    $this->attributes['exclusive'],
197
                    $this->attributes['auto_delete'],
198
                    $this->attributes['internal'],
199
                    $this->attributes['nowait']
200
                );
201
        } catch (AMQPProtocolChannelException $e) {
202
            // 406 is a soft error triggered for precondition failure (when redeclaring with different parameters)
203
            if (true === $this->attributes['throw_exception_on_redeclare'] || $e->amqp_reply_code !== 406) {
0 ignored issues
show
The condition $e->amqp_reply_code !== 406 is always true.
Loading history...
204
                throw $e;
205
            }
206
            // a failure trigger channels closing process
207
            $this->reconnect();
208
        }
209
    }
210
211
    public function bind()
212
    {
213
        if (!isset($this->attributes['bind']) || empty($this->attributes['bind'])) {
214
            return;
215
        }
216
        foreach ($this->attributes['bind'] as $bindItem) {
217
            try {
218
                $this->getChannel()
219
                    ->queue_bind(
220
                        $this->attributes['name'],
221
                        $bindItem['exchange'],
222
                        $bindItem['routing_key']
223
                    );
224
            } catch (AMQPProtocolChannelException $e) {
225
                // 404 is the code for trying to bind to an non-existing entity
226
                if (true === $this->attributes['throw_exception_on_bind_fail'] || $e->amqp_reply_code !== 404) {
227
                    throw $e;
228
                }
229
                $this->reconnect();
230
            }
231
        }
232
    }
233
234
    /**
235
     * Delete the queue
236
     */
237
    public function delete()
238
    {
239
        $this->getChannel()->queue_delete($this->attributes['name']);
240
    }
241
242
    /**
243
     * {@inheritdoc}
244
     */
245
    public function reconnect()
246
    {
247
        $this->getConnection()->reconnect();
248
    }
249
250
    /**
251
     * Publish a message
252
     *
253
     * @param string $message
254
     * @param string $routingKey
255
     * @return mixed|void
256
     * @throws AMQPProtocolChannelException
257
     */
258
    public function publish(string $message, string $routingKey = '')
259
    {
260
        if ($this->attributes['auto_create'] === true) {
261
            $this->create();
262
            $this->bind();
263
        }
264
265
        try {
266
            if (!$this->getChannel()->is_open()) {
267
                $this->getConnection()->reconnect();
268
            }
269
            $channel = $this->getChannel();
270
            $channel->basic_publish(
271
                new AMQPMessage($message),
272
                '',
273
                $this->attributes['name'],
274
                true
275
            );
276
            $this->retryCount = 0;
277
            $this->connection->close();
278
        } catch (AMQPChannelClosedException $exception) {
279
            $this->retryCount++;
280
            // Retry publishing with re-connect
281
            if ($this->retryCount < self::MAX_RETRIES) {
282
                $this->getConnection()->reconnect();
283
                $this->publish($message, $routingKey);
284
                $this->connection->close();
285
                return;
286
            }
287
            throw $exception;
288
        }
289
    }
290
291
    /**
292
     * {@inheritdoc}
293
     *
294
     * @param int $messages
295
     * @param int $seconds
296
     * @param int $maxMemory
297
     * @return int
298
     */
299
    public function startConsuming(int $messages, int $seconds, int $maxMemory)
300
    {
301
        $this->setupConsumer($messages, $seconds, $maxMemory);
302
        while (false === $this->shouldStopConsuming()) {
303
            try {
304
                $this->getChannel()->wait();
305
            } catch (AMQPTimeoutException $e) {
306
                $this->logger->debug('startConsuming Exception--->'.$e->getMessage());
307
                usleep(1000);
308
                $this->getConnection()->reconnect();
309
                $this->setupChannelConsumer();
310
            } catch (\Throwable $e) {
311
                // stop the consumer
312
                $this->stopConsuming();
313
                $this->logger->notice(sprintf(
314
                    "Stopped consuming: %s in %s:%d",
315
                    get_class($e) . ' - ' . $e->getMessage(),
316
                    (string)$e->getFile(),
317
                    (int)$e->getLine()
318
                ));
319
                return 1;
320
            }
321
        }
322
        return 0;
323
    }
324
325
    /**
326
     * @return bool
327
     */
328
    protected function shouldStopConsuming(): bool
329
    {
330
        if ((microtime(true) - $this->startTime) > $this->limitSecondsUptime) {
331
            $this->logger->debug(
332
                "Stopped consumer, Please make sure you are using supervisor to auto restart consumer",
333
                [
334
                    'limit' => 'time_limit',
335
                    'value' => sprintf("%.2f", microtime(true) - $this->startTime)
336
                ]
337
            );
338
            return true;
339
        }
340
        if (memory_get_peak_usage(true) >= ($this->limitMemoryConsumption * 1048576)) {
341
            $this->logger->debug(
342
                "Stopped consumer, Please make sure you are using supervisor to auto restart consumer",
343
                [
344
                    'limit' => 'memory_limit',
345
                    'value' => (int)round(memory_get_peak_usage(true) / 1048576, 2)
346
                ]
347
            );
348
            return true;
349
        }
350
351
        if ($this->getMessageProcessor()->getProcessedMessages() >= $this->limitMessageCount) {
352
            $this->logger->debug(
353
                "Stopped consumer, Please make sure you are using supervisor to auto restart consumer",
354
                ['limit' => 'message_count', 'value' => (int)$this->getMessageProcessor()->getProcessedMessages()]
355
            );
356
            return true;
357
        }
358
        return false;
359
    }
360
361
    /**
362
     * Stop the consumer
363
     */
364
    public function stopConsuming()
365
    {
366
        try {
367
            $this->getChannel()->basic_cancel($this->getConsumerTag(), false, true);
368
        } catch (\Throwable $e) {
369
            $this->logger->notice("Got " . $e->getMessage() . " of type " . get_class($e));
370
        }
371
    }
372
373
    /**
374
     * Setup the consumer
375
     *
376
     * @param int $messages
377
     * @param int $seconds
378
     * @param int $maxMemory
379
     */
380
    protected function setupConsumer(int $messages, int $seconds, int $maxMemory)
381
    {
382
        $this->limitMessageCount = $messages;
383
        $this->limitSecondsUptime = $seconds;
384
        $this->limitMemoryConsumption = $maxMemory;
385
386
        $this->startTime = microtime(true);
387
388
        $this->setupChannelConsumer();
389
390
        $this->registerShutdownHandler();
391
        $this->handleKillSignals();
392
    }
393
394
    private function setupChannelConsumer()
395
    {
396
        if ($this->attributes['auto_create'] === true) {
397
            $this->create();
398
            $this->bind();
399
        }
400
401
        $this->getChannel()
402
            ->basic_qos(null, $this->prefetchCount, true);
403
404
        $this->getChannel()
405
            ->basic_consume(
406
                $this->attributes['name'],
407
                $this->getConsumerTag(),
408
                false,
409
                false,
410
                false,
411
                false,
412
                [
413
                    $this,
414
                    'consume'
415
                ]
416
            );
417
    }
418
419
    /**
420
     * Handle shutdown - Usually in case "Allowed memory size of x bytes exhausted"
421
     */
422
    private function registerShutdownHandler()
423
    {
424
        $consumer = $this;
425
        register_shutdown_function(function () use ($consumer) {
426
            $consumer->stopConsuming();
427
        });
428
    }
429
430
    /**
431
     * Register signals
432
     */
433
    protected function handleKillSignals()
434
    {
435
        if (extension_loaded('pcntl')) {
436
            pcntl_signal(SIGTERM, [$this, 'catchKillSignal']);
437
            pcntl_signal(SIGINT, [$this, 'catchKillSignal']);
438
439
            if (function_exists('pcntl_signal_dispatch')) {
440
                // let the signal go forward
441
                pcntl_signal_dispatch();
442
            }
443
        }
444
    }
445
446
    /**
447
     * Handle Kill Signals
448
     * @param int $signalNumber
449
     */
450
    public function catchKillSignal(int $signalNumber)
451
    {
452
        $this->stopConsuming();
453
        $this->logger->debug(sprintf("Caught signal %d", $signalNumber));
454
    }
455
456
    /**
457
     * It is the tag that is listed in RabbitMQ UI as the consumer "name"
458
     *
459
     * @return string
460
     */
461
    private function getConsumerTag(): string
462
    {
463
        return sprintf("%s_%s_%s", $this->aliasName, gethostname(), getmypid());
464
    }
465
466
    /**
467
     * @return MessageProcessorInterface
468
     */
469
    private function getMessageProcessor(): MessageProcessorInterface
470
    {
471
        if (!($this->messageProcessor instanceof MessageProcessorInterface)) {
472
            $this->messageProcessor = app($this->messageProcessor);
0 ignored issues
show
Documentation Bug introduced by
It seems like app($this->messageProcessor) can also be of type Illuminate\Contracts\Foundation\Application. However, the property $messageProcessor is declared as type JokerProject\LaravelAliy...orInterface|null|string. Maybe add an additional type check?

Our type inference engine has found a suspicous assignment of a value to a property. This check raises an issue when a value that can be of a mixed type is assigned to a property that is type hinted more strictly.

For example, imagine you have a variable $accountId that can either hold an Id object or false (if there is no account id yet). Your code now assigns that value to the id property of an instance of the Account class. This class holds a proper account, so the id value must no longer be false.

Either this assignment is in error or a type check should be added for that assignment.

class Id
{
    public $id;

    public function __construct($id)
    {
        $this->id = $id;
    }

}

class Account
{
    /** @var  Id $id */
    public $id;
}

$account_id = false;

if (starsAreRight()) {
    $account_id = new Id(42);
}

$account = new Account();
if ($account instanceof Id)
{
    $account->id = $account_id;
}
Loading history...
473
            if ($this->messageProcessor instanceof AbstractMessageProcessor) {
474
                $this->messageProcessor->setLogger($this->logger);
475
            }
476
        }
477
        return $this->messageProcessor;
478
    }
479
480
    /**
481
     * @param AMQPMessage $message
482
     * @throws \Throwable
483
     */
484
    public function consume(AMQPMessage $message)
485
    {
486
        try {
487
            $this->getMessageProcessor()->consume($message);
488
            $this->logger->debug("Consumed message", [$message->getBody()]);
489
        } catch (\Throwable $e) {
490
            $this->logger->notice(
491
                sprintf(
492
                    "Got %s from %s in %d",
493
                    $e->getMessage(),
494
                    (string)$e->getFile(),
495
                    (int)$e->getLine()
496
                )
497
            );
498
            // let the exception slide, the processor should handle
499
            // exception, this is just a notice that should not
500
            // ever appear
501
            throw $e;
502
        }
503
    }
504
}
505