AmqpConsumer   A
last analyzed

Complexity

Total Complexity 13

Size/Duplication

Total Lines 112
Duplicated Lines 0 %

Test Coverage

Coverage 69.57%

Importance

Changes 0
Metric Value
wmc 13
dl 0
loc 112
ccs 32
cts 46
cp 0.6957
rs 10
c 0
b 0
f 0

3 Methods

Rating   Name   Duplication   Size   Complexity  
B signalHandler() 0 24 6
B startConsuming() 0 30 4
A __construct() 0 23 3
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Werkspot\MessageQueue\DeliveryQueue\Amqp;
6
7
use PhpAmqpLib\Channel\AMQPChannel;
8
use PhpAmqpLib\Connection\AMQPLazyConnection;
9
use PhpAmqpLib\Wire\AMQPTable;
10
use Psr\Log\LoggerInterface;
11
use Psr\Log\NullLogger;
12
use RuntimeException;
13
use Werkspot\MessageQueue\DeliveryQueue\ConsumerInterface;
14
15
final class AmqpConsumer implements ConsumerInterface
16
{
17
    /**
18
     * @var AMQPLazyConnection
19
     */
20
    private $connection;
21
22
    /**
23
     * @var AMQPChannel
24
     */
25
    private $channel;
26
27
    /**
28
     * @var AmqpMessageHandlerInterface
29
     */
30
    private $handler;
31
32
    /**
33
     * @var bool
34
     */
35
    private $exitSignalReceived = false;
36
37
    /**
38
     * @var LoggerInterface
39
     */
40
    private $logger;
41
42
    /**
43
     * Setup signals and connection
44
     */
45 3
    public function __construct(
46
        AMQPLazyConnection $connection,
47
        AmqpMessageHandlerInterface $handler,
48
        LoggerInterface $logger = null
49
    ) {
50 3
        if (extension_loaded('pcntl')) {
51 3
            if (!defined('AMQP_WITHOUT_SIGNALS')) {
52 1
                define('AMQP_WITHOUT_SIGNALS', false);
53
            }
54
55 3
            pcntl_signal(SIGTERM, [$this, 'signalHandler']);
56 3
            pcntl_signal(SIGHUP, [$this, 'signalHandler']);
57 3
            pcntl_signal(SIGINT, [$this, 'signalHandler']);
58 3
            pcntl_signal(SIGQUIT, [$this, 'signalHandler']);
59 3
            pcntl_signal(SIGUSR1, [$this, 'signalHandler']);
60 3
            pcntl_signal(SIGUSR2, [$this, 'signalHandler']);
61
        } else {
62
            throw new RuntimeException('Unable to process signals');
63
        }
64
65 3
        $this->connection = $connection;
66 3
        $this->handler = $handler;
67 3
        $this->logger = $logger ?? new NullLogger();
68 3
    }
69
70
    public function signalHandler(int $signalNumber): void
71
    {
72
        switch ($signalNumber) {
73
            case SIGTERM:  // 15 : supervisor default stop
74
            case SIGQUIT:  // 3  : kill -s QUIT
75
            case SIGINT:   // 2  : ctrl+c
76
            case SIGHUP:   // 1  : kill -s HUP
77
                // We COULD always do a $this->channel->close() here, but that would result in the channel being closed,
78
                // rabbit not receiving the ack and so it will try it again. But the problem with this is that there
79
                // could have been a command halfway processing, or even worse, just done processing, in that case,
80
                // we'do do the command again, basically executing it twice. So it's better to just wait, and finish
81
                // the current/next command, and stop after that
82
                $this->exitSignalReceived = true;
83
84
                // But if we're just waiting and not doing anything of/c we can close now
85
                if (!$this->handler->isHandlingMessage()) {
86
                    $this->connection->close();
87
                }
88
                break;
89
            default:
90
                // Some unknown event... idk what to do? probably not exit... but yeah what then? ... let's just pretend
91
                // it didn't happen ><
92
                $this->logger->warning('Got unhandled signal: ' . $signalNumber);
93
                break;
94
        }
95
    }
96
97 3
    public function startConsuming(string $queueName, int $maxSeconds): void
98
    {
99 3
        $start = time();
100
101 3
        $this->channel = $this->connection->channel();
102
103
        // Make sure  we get messages 1-by-1 so rabbitmq-server can distribute the work properly and implement
104
        // the priority properly
105 1
        $this->channel->basic_qos(0, 1, true);
106
107 1
        $this->channel->basic_consume(
108 1
            $queueName,
109 1
            '',
110 1
            false,
111 1
            false,
112 1
            false,
113 1
            false,
114 1
            [$this->handler, 'handle'],
115 1
            null,
116 1
            new AMQPTable(array(
117 1
                "x-max-priority" => 10
118
            ))
119
        );
120
121 1
        while (count($this->channel->callbacks) && !$this->exitSignalReceived) {
122
            // Wait for the next message to arrive and process it
123 1
            $this->channel->wait();
124
125 1
            if (time() > ($start + $maxSeconds)) {
126
                break;
127
            }
128
        }
129 1
    }
130
}
131