AmqpMessageHandler::handle()   B
last analyzed

Complexity

Conditions 5
Paths 35

Size

Total Lines 45
Code Lines 24

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 23
CRAP Score 5

Importance

Changes 0
Metric Value
dl 0
loc 45
ccs 23
cts 23
cp 1
rs 8.439
c 0
b 0
f 0
cc 5
eloc 24
nc 35
nop 1
crap 5
1
<?php
2
3
namespace Werkspot\MessageQueue\DeliveryQueue\Amqp;
4
5
use DateTime;
6
use PhpAmqpLib\Message\AMQPMessage;
7
use Psr\Log\LoggerInterface;
8
use Psr\Log\NullLogger;
9
use Psr\SimpleCache\CacheInterface;
10
use Werkspot\MessageQueue\DeliveryQueue\Exception\AlreadyClaimedException;
11
use Werkspot\MessageQueue\DeliveryQueue\Exception\CanNotClaimException;
12
use Werkspot\MessageQueue\DeliveryQueue\Exception\UnrecoverablePersistenceException;
13
use Werkspot\MessageQueue\DeliveryQueue\MessageHandlerInterface;
14
use Werkspot\MessageQueue\DeliveryQueue\PersistenceClientInterface;
15
use Werkspot\MessageQueue\Message\MessageInterface;
16
17
final class AmqpMessageHandler implements AmqpMessageHandlerInterface
18
{
19
    const CLAIM_TTL = 3600;
20
    const UNPACKING_EXCEPTION_MSG_UNSERIALIZABLE_CONTENT = 'It was not possible to unserialize the AmqpMessage contents.';
21
    const UNPACKING_EXCEPTION_MSG_UNEXPECTED_TYPE = 'The AmqpMessage content is not a MessageQueue MessageInterface.';
22
23
    /**
24
     * @var MessageHandlerInterface
25
     */
26
    private $handler;
27
28
    /**
29
     * @var CacheInterface
30
     */
31
    private $cache;
32
33
    /**
34
     * @var LoggerInterface
35
     */
36
    private $logger;
37
38
    /**
39
     * @var PersistenceClientInterface
40
     */
41
    private $persistenceClient;
42
43
    /**
44
     * @var bool
45
     */
46
    private $hasRegisteredShutdownHandler = false;
47
48
    /**
49
     * @var AMQPMessage
50
     */
51
    private $amqpMessage;
52
53 7
    public function __construct(
54
        MessageHandlerInterface $handler,
55
        CacheInterface $cache,
56
        PersistenceClientInterface $persistenceClient,
57
        LoggerInterface $logger = null
58
    ) {
59 7
        $this->handler = $handler;
60 7
        $this->cache = $cache;
61 7
        $this->persistenceClient = $persistenceClient;
62 7
        $this->logger = $logger ?? new NullLogger();
63 7
    }
64
65 7
    public function handle(AMQPMessage $amqpMessage): void
66
    {
67 7
        $this->initiateHandling($amqpMessage);
68 7
        $this->registerShutdownHandler();
69
70
        // RabbitMQ can and WILL send duplicate messages to clients. You are recommended to handle message in an
71
        // idempotent way, but for us this is not really possible, because sending out emails is never idempotent.
72
        // So we are doing the next best thing, deduplicating them.
73
        //
74
        // When deduplicating them we should do it as early as possible and in the most performing way as possible to
75
        // prevent race conditions where 2 consumers get the message at near the same time. So make sure we deduplicate
76
        // as soon as possible to keep the duplicate executing at a minimum.
77
        //
78
        // See: https://www.rabbitmq.com/reliability.html#consumer
79
        //  > [..] messages can be duplicated, and consumers must be prepared to handle them. If possible, ensure that
80
        //  > your consumers handle messages in an idempotent way rather than explicitly deal with deduplication.
81
82
        try {
83 7
            $this->claim($amqpMessage);
84 2
        } catch (AlreadyClaimedException $e) {
85 1
            $this->logAlreadyClaimedMessage($amqpMessage);
86
            // If we already processed it, we should still ack it here, otherwise rabbit will try to do it again, which
87
            // is exactly what we don't want.
88 1
            $this->acknowledgeMessage($amqpMessage);
89 1
            $this->terminateHandling();
90 1
            return;
91 1
        } catch (CanNotClaimException $e) {
92 1
            $this->logger->notice($e->getMessage());
93
            // if we don't have a message ID we can not claim it nor deduplicate it so
94
            // we should always process it, just to be sure we don't lose a message
95
        }
96
97
        try {
98 6
            $this->handler->handle($this->unpack($amqpMessage));
99 1
            $this->acknowledgeMessage($amqpMessage);
100 5
        } catch (AmqpMessageUnpackingException $e) {
101 3
            $this->logger->warning($e->getMessage());
102 3
            $this->persistenceClient->persistUndeliverableMessage($amqpMessage->body, $e->getMessage());
103 3
            $this->acknowledgeMessage($amqpMessage);
104 2
        } catch (UnrecoverablePersistenceException $e) {
105 1
            $this->logger->warning($e->getMessage());
106 1
            $this->acknowledgeMessage($amqpMessage);
107 1
            throw $e;
108 4
        } finally {
109 6
            $this->terminateHandling();
110
        }
111
        // We do not acknowledge the amqp msg when there was any other exception because if we could not put the msg
112
        // back in the scheduled queue, we would lose the message. This way it stays in rabbitMq and is tried again.
113 4
    }
114
115
    public function shutdownHandler(): void
116
    {
117
        if (!$this->isHandlingMessage()) {
118
            return;
119
        }
120
121
        $this->handler->shutdown();
122
123
        $this->acknowledgeMessage($this->amqpMessage);
124
    }
125
126 7
    private function registerShutdownHandler(): void
127
    {
128
        // The only way to catch (all) fatal errors is to have a shutdownHandler
129
        // This piece of code for example will not throw a Throwable, but just kill the php script:
130
        // ```
131
        //   $client = new SoapClient(null, ['location' => "http://localhost/soap.php", 'uri' => "http://test-uri/"]);
132
        //   $client->__setSoapHeaders([0 => null]);
133
        // ```
134
        //
135
        // But this is a big problem, because in that case we would not send an 'ACK' back to rabbit, as the fatal
136
        // error would just kill the php script, so rabbit would try again with the next available worker (as the
137
        // connection was hard interrupted).
138
        //
139
        // So if it fails again in the next worker with a fatal error again we get an endless retry loop in rabbit
140
        // which will basically break the queue, as nothing else is processed except for endlessly retrying the fatal.
141
        //
142
        // To remedy this we register the shutdown handler, we add the fatal error message to the command, and store
143
        // it in the queue to try again after the normal retry period, finally we send the ACK to rabbit, as we've
144
        // dealt with it properly (we will retry it in the future, and have logged the error nicely in the queue).
145 7
        if (!$this->hasRegisteredShutdownHandler) {
146
            register_shutdown_function([$this, 'shutdownHandler']);
147
            $this->hasRegisteredShutdownHandler = true;
148
        }
149 7
    }
150
151
    /**
152
     * @throws AlreadyClaimedException
153
     * @throws CanNotClaimException
154
     */
155 7
    private function claim(AMQPMessage $amqpMessage): void
156
    {
157 7
        if (!$amqpMessage->has('message_id')) {
158 1
            throw new CanNotClaimException(
159
                'Could not claim AMQP message because it does not have a message ID. Message body: '
160 1
                . $amqpMessage->body
161
            );
162
        }
163
164 6
        $cacheKey = 'msg_handled.' . $amqpMessage->get('message_id');
0 ignored issues
show
Bug introduced by
Are you sure $amqpMessage->get('message_id') of type PhpAmqpLib\Channel\AMQPChannel|mixed can be used in concatenation? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

164
        $cacheKey = 'msg_handled.' . /** @scrutinizer ignore-type */ $amqpMessage->get('message_id');
Loading history...
165
166 6
        if ($this->cache->has($cacheKey)) {
167 1
            throw new AlreadyClaimedException('AMQP message has already been claimed, with cache key ' . $cacheKey);
168
        }
169
170 5
        $this->cache->set($cacheKey, true, self::CLAIM_TTL);
171 5
    }
172
173 6
    private function acknowledgeMessage(AMQPMessage $amqpMessage): void
174
    {
175 6
        $channel = $amqpMessage->delivery_info['channel'];
176 6
        $channel->basic_ack($amqpMessage->delivery_info['delivery_tag']);
0 ignored issues
show
Bug introduced by
$amqpMessage->delivery_info['delivery_tag'] of type PhpAmqpLib\Channel\AMQPChannel is incompatible with the type string expected by parameter $delivery_tag of PhpAmqpLib\Channel\AMQPChannel::basic_ack(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

176
        $channel->basic_ack(/** @scrutinizer ignore-type */ $amqpMessage->delivery_info['delivery_tag']);
Loading history...
177 6
    }
178
179 7
    private function unpack(AMQPMessage $amqpMessage): MessageInterface
180
    {
181 7
        $result = @unserialize($amqpMessage->body);
182
183 7
        if ($result === false) {
184 2
            throw new AmqpMessageUnpackingException(self::UNPACKING_EXCEPTION_MSG_UNSERIALIZABLE_CONTENT);
185
        }
186
187 5
        if (! $result instanceof MessageInterface) {
188 1
            throw new AmqpMessageUnpackingException(self::UNPACKING_EXCEPTION_MSG_UNEXPECTED_TYPE);
189
        }
190
191 4
        return $result;
192
    }
193
194 1 View Code Duplication
    private function getLogMessage(MessageInterface $message): string
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
195
    {
196 1
        return $message->getId()
197 1
            . ', ' . $message->getDestination()
198 1
            . ', ' . $message->getDeliverAt()->format(DateTime::ATOM)
199 1
            . ', ' . $message->getCreatedAt()->format(DateTime::ATOM)
200 1
            . ', ' . $message->getUpdatedAt()->format(DateTime::ATOM)
201 1
            . ', ' . $message->getTries()
202 1
            . ', ' . $message->getPriority()
203 1
            . ', ' . $this->getPayloadDescription($message);
204
    }
205
206 1 View Code Duplication
    private function getPayloadDescription(MessageInterface $message): string
207
    {
208 1
        if (is_object($message->getPayload())) {
209
            return get_class($message->getPayload());
210
        }
211
212 1
        if (is_array($message->getPayload())) {
213
            return 'array';
214
        }
215
216 1
        return (string) $message->getPayload();
217
    }
218
219
    /**
220
     * @param AMQPMessage $amqpMessage
221
     *
222
     */
223 1
    private function logAlreadyClaimedMessage(AMQPMessage $amqpMessage): void
224
    {
225 1
        $queuedMessage = $this->unpack($amqpMessage);
226
227 1
        $message = 'Queue message was already claimed: ' . $this->getLogMessage($queuedMessage);
228
229 1
        $this->logger->notice($message);
230 1
    }
231
232 7
    private function initiateHandling(AMQPMessage $amqpMessage): void
233
    {
234 7
        $this->amqpMessage = $amqpMessage;
235 7
    }
236
237 7
    private function terminateHandling(): void
238
    {
239 7
        $this->amqpMessage = null;
240 7
    }
241
242
    public function isHandlingMessage(): bool
243
    {
244
        return $this->amqpMessage !== null;
245
    }
246
}
247