Test Failed
Pull Request — master (#7)
by Herberto
05:22
created

AmqpMessageHandler::handle()   B

Complexity

Conditions 5
Paths 35

Size

Total Lines 45
Code Lines 24

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 45
c 0
b 0
f 0
rs 8.439
cc 5
eloc 24
nc 35
nop 1
1
<?php
2
3
namespace Werkspot\MessageQueue\DeliveryQueue\Amqp;
4
5
use DateTime;
6
use PhpAmqpLib\Message\AMQPMessage;
7
use Psr\Log\LoggerInterface;
8
use Psr\Log\NullLogger;
9
use Psr\SimpleCache\CacheInterface;
10
use Werkspot\MessageQueue\DeliveryQueue\Exception\AlreadyClaimedException;
11
use Werkspot\MessageQueue\DeliveryQueue\Exception\CanNotClaimException;
12
use Werkspot\MessageQueue\DeliveryQueue\Exception\UnrecoverablePersistenceException;
13
use Werkspot\MessageQueue\DeliveryQueue\MessageHandlerInterface;
14
use Werkspot\MessageQueue\DeliveryQueue\PersistenceClientInterface;
15
use Werkspot\MessageQueue\Message\MessageInterface;
16
17
final class AmqpMessageHandler implements AmqpMessageHandlerInterface
18
{
19
    const CLAIM_TTL = 3600;
20
    const UNPACKING_EXCEPTION_MSG_UNSERIALIZABLE_CONTENT = 'It was not possible to unserialize the AmqpMessage contents.';
21
    const UNPACKING_EXCEPTION_MSG_UNEXPECTED_TYPE = 'The AmqpMessage content is not a MessageQueue MessageInterface.';
22
23
    /**
24
     * @var MessageHandlerInterface
25
     */
26
    private $handler;
27
28
    /**
29
     * @var CacheInterface
30
     */
31
    private $cache;
32
33
    /**
34
     * @var LoggerInterface
35
     */
36
    private $logger;
37
38
    /**
39
     * @var PersistenceClientInterface
40
     */
41
    private $persistenceClient;
42
43
    /**
44
     * @var bool
45
     */
46
    private $hasRegisteredShutdownHandler = false;
47
48
    /**
49
     * @var AMQPMessage
50
     */
51
    private $amqpMessage;
52
53
    public function __construct(
54
        MessageHandlerInterface $handler,
55
        CacheInterface $cache,
56
        PersistenceClientInterface $persistenceClient,
57
        LoggerInterface $logger = null
58
    ) {
59
        $this->handler = $handler;
60
        $this->cache = $cache;
61
        $this->persistenceClient = $persistenceClient;
62
        $this->logger = $logger ?? new NullLogger();
63
    }
64
65
    public function handle(AMQPMessage $amqpMessage): void
66
    {
67
        $this->initiateHandling($amqpMessage);
68
        $this->registerShutdownHandler();
69
70
        // RabbitMQ can and WILL send duplicate messages to clients. You are recommended to handle message in an
71
        // idempotent way, but for us this is not really possible, because sending out emails is never idempotent.
72
        // So we are doing the next best thing, deduplicating them.
73
        //
74
        // When deduplicating them we should do it as early as possible and in the most performing way as possible to
75
        // prevent race conditions where 2 consumers get the message at near the same time. So make sure we deduplicate
76
        // as soon as possible to keep the duplicate executing at a minimum.
77
        //
78
        // See: https://www.rabbitmq.com/reliability.html#consumer
79
        //  > [..] messages can be duplicated, and consumers must be prepared to handle them. If possible, ensure that
80
        //  > your consumers handle messages in an idempotent way rather than explicitly deal with deduplication.
81
82
        try {
83
            $this->claim($amqpMessage);
84
        } catch (AlreadyClaimedException $e) {
85
            $this->logAlreadyClaimedMessage($amqpMessage);
86
            // If we already processed it, we should still ack it here, otherwise rabbit will try to do it again, which
87
            // is exactly what we don't want.
88
            $this->acknowledgeMessage($amqpMessage);
89
            $this->terminateHandling();
90
            return;
91
        } catch (CanNotClaimException $e) {
92
            $this->logger->notice($e->getMessage());
93
            // if we don't have a message ID we can not claim it nor deduplicate it so
94
            // we should always process it, just to be sure we don't lose a message
95
        }
96
97
        try {
98
            $this->handler->handle($this->unpack($amqpMessage));
99
            $this->acknowledgeMessage($amqpMessage);
100
        } catch (AmqpMessageUnpackingException $e) {
101
            $this->logger->warning($e->getMessage());
102
            $this->persistenceClient->persistUndeliverableMessage($amqpMessage->body, $e->getMessage());
103
            $this->acknowledgeMessage($amqpMessage);
104
        } catch (UnrecoverablePersistenceException $e) {
105
            $this->logger->warning($e->getMessage());
106
            $this->acknowledgeMessage($amqpMessage);
107
            throw $e;
108
        } finally {
109
            $this->terminateHandling();
110
        }
111
        // We do not acknowledge the amqp msg when there was any other exception because if we could not put the msg
112
        // back in the scheduled queue, we would lose the message. This way it stays in rabbitMq and is tried again.
113
    }
114
115
    public function shutdownHandler(): void
116
    {
117
        if (!$this->isHandlingMessage()) {
118
            return;
119
        }
120
121
        $this->handler->shutdown();
122
123
        $this->acknowledgeMessage($this->amqpMessage);
124
    }
125
126
    private function registerShutdownHandler(): void
127
    {
128
        // The only way to catch (all) fatal errors is to have a shutdownHandler
129
        // This piece of code for example will not throw a Throwable, but just kill the php script:
130
        // ```
131
        //   $client = new SoapClient(null, ['location' => "http://localhost/soap.php", 'uri' => "http://test-uri/"]);
132
        //   $client->__setSoapHeaders([0 => null]);
133
        // ```
134
        //
135
        // But this is a big problem, because in that case we would not send an 'ACK' back to rabbit, as the fatal
136
        // error would just kill the php script, so rabbit would try again with the next available worker (as the
137
        // connection was hard interrupted).
138
        //
139
        // So if it fails again in the next worker with a fatal error again we get an endless retry loop in rabbit
140
        // which will basically break the queue, as nothing else is processed except for endlessly retrying the fatal.
141
        //
142
        // To remedy this we register the shutdown handler, we add the fatal error message to the command, and store
143
        // it in the queue to try again after the normal retry period, finally we send the ACK to rabbit, as we've
144
        // dealt with it properly (we will retry it in the future, and have logged the error nicely in the queue).
145
        if (!$this->hasRegisteredShutdownHandler) {
146
            register_shutdown_function([$this, 'shutdownHandler']);
147
            $this->hasRegisteredShutdownHandler = true;
148
        }
149
    }
150
151
    /**
152
     * @throws AlreadyClaimedException
153
     * @throws CanNotClaimException
154
     */
155
    private function claim(AMQPMessage $amqpMessage): void
156
    {
157
        if (!$amqpMessage->has('message_id')) {
158
            throw new CanNotClaimException(
159
                'Could not claim AMQP message because it does not have a message ID. Message body: '
160
                . $amqpMessage->body
161
            );
162
        }
163
164
        $cacheKey = 'msg_handled.' . $amqpMessage->get('message_id');
165
166
        if ($this->cache->has($cacheKey)) {
167
            throw new AlreadyClaimedException('AMQP message has already been claimed, with cache key ' . $cacheKey);
168
        }
169
170
        $this->cache->set($cacheKey, true, self::CLAIM_TTL);
171
    }
172
173
    private function acknowledgeMessage(AMQPMessage $amqpMessage): void
174
    {
175
        $channel = $amqpMessage->delivery_info['channel'];
176
        $channel->basic_ack($amqpMessage->delivery_info['delivery_tag']);
177
    }
178
179
    private function unpack(AMQPMessage $amqpMessage): MessageInterface
180
    {
181
        $result = @unserialize($amqpMessage->body);
182
183
        if ($result === false) {
184
            throw new AmqpMessageUnpackingException(self::UNPACKING_EXCEPTION_MSG_UNSERIALIZABLE_CONTENT);
185
        }
186
187
        if (! $result instanceof MessageInterface) {
188
            throw new AmqpMessageUnpackingException(self::UNPACKING_EXCEPTION_MSG_UNEXPECTED_TYPE);
189
        }
190
191
        return $result;
192
    }
193
194
    private function getLogMessage(MessageInterface $message): string
195
    {
196
        return $message->getId()
197
            . ', ' . $message->getDestination()
198
            . ', ' . $message->getDeliverAt()->format(DateTime::ATOM)
199
            . ', ' . $message->getCreatedAt()->format(DateTime::ATOM)
200
            . ', ' . $message->getUpdatedAt()->format(DateTime::ATOM)
201
            . ', ' . $message->getTries()
202
            . ', ' . $message->getPriority()
203
            . ', ' . $this->getPayloadDescription($message);
204
    }
205
206
    private function getPayloadDescription(MessageInterface $message): string
207
    {
208
        if (is_object($message->getPayload())) {
209
            return get_class($message->getPayload());
210
        }
211
212
        if (is_array($message->getPayload())) {
213
            return 'array';
214
        }
215
216
        return (string) $message->getPayload();
217
    }
218
219
    /**
220
     * @param AMQPMessage $amqpMessage
221
     *
222
     */
223
    private function logAlreadyClaimedMessage(AMQPMessage $amqpMessage): void
224
    {
225
        $queuedMessage = $this->unpack($amqpMessage);
226
227
        $message = 'Queue message was already claimed: ' . $this->getLogMessage($queuedMessage);
228
229
        $this->logger->notice($message);
230
    }
231
232
    private function initiateHandling(AMQPMessage $amqpMessage): void
233
    {
234
        $this->amqpMessage = $amqpMessage;
235
    }
236
237
    private function terminateHandling(): void
238
    {
239
        $this->amqpMessage = null;
240
    }
241
242
    public function isHandlingMessage(): bool
243
    {
244
        return $this->amqpMessage !== null;
245
    }
246
}
247