Test Failed
Pull Request — master (#7)
by Herberto
10:01 queued 03:44
created

AmqpMessageHandler::handle()   B

Complexity

Conditions 5
Paths 35

Size

Total Lines 45
Code Lines 24

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 45
c 0
b 0
f 0
rs 8.439
cc 5
eloc 24
nc 35
nop 1
1
<?php
2
3
namespace Werkspot\MessageQueue\DeliveryQueue\Amqp;
4
5
use DateTime;
6
use PhpAmqpLib\Message\AMQPMessage;
7
use Psr\Log\LoggerInterface;
8
use Psr\Log\NullLogger;
9
use Psr\SimpleCache\CacheInterface;
10
use Werkspot\MessageQueue\DeliveryQueue\Exception\AlreadyClaimedException;
11
use Werkspot\MessageQueue\DeliveryQueue\Exception\CanNotClaimException;
12
use Werkspot\MessageQueue\DeliveryQueue\Exception\UnrecoverablePersistenceException;
13
use Werkspot\MessageQueue\DeliveryQueue\MessageHandlerInterface;
14
use Werkspot\MessageQueue\DeliveryQueue\PersistenceClientInterface;
15
use Werkspot\MessageQueue\Message\MessageInterface;
16
17
final class AmqpMessageHandler implements AmqpMessageHandlerInterface
18
{
19
    const CLAIM_TTL = 3600;
20
    const UNPACKING_EXCEPTION_MSG_UNSERIALIZABLE_CONTENT = 'It was not possible to unserialize the AmqpMessage contents.';
21
    const UNPACKING_EXCEPTION_MSG_UNEXPECTED_TYPE = 'The AmqpMessage content is not a MessageQueue MessageInterface.';
22
23
    /**
24
     * @var MessageHandlerInterface
25
     */
26
    private $handler;
27
28
    /**
29
     * @var CacheInterface
30
     */
31
    private $cache;
32
33
    /**
34
     * @var LoggerInterface
35
     */
36
    private $logger;
37
38
    /**
39
     * @var PersistenceClientInterface
40
     */
41
    private $persistenceClient;
42
43
    /**
44
     * @var bool
45
     */
46
    private $hasRegisteredShutdownHandler = false;
47
48
    /**
49
     * @var AMQPMessage
50
     */
51
    private $amqpMessage;
52
53
    public function __construct(
54
        MessageHandlerInterface $handler,
55
        CacheInterface $cache,
56
        PersistenceClientInterface $persistenceClient,
57
        LoggerInterface $logger = null
58
    ) {
59
        $this->handler = $handler;
60
        $this->cache = $cache;
61
        $this->persistenceClient = $persistenceClient;
62
        $this->logger = $logger ?? new NullLogger();
63
    }
64
65
    public function handle(AMQPMessage $amqpMessage): void
66
    {
67
        $this->initiateHandling($amqpMessage);
68
        $this->registerShutdownHandler();
69
70
        // RabbitMQ can and WILL send duplicate messages to clients. You are recommended to handle message in an
71
        // idempotent way, but for us this is not really possible, because sending out emails is never idempotent.
72
        // So we are doing the next best thing, deduplicating them.
73
        //
74
        // When deduplicating them we should do it as early as possible and in the most performing way as possible to
75
        // prevent race conditions where 2 consumers get the message at near the same time. So make sure we deduplicate
76
        // as soon as possible to keep the duplicate executing at a minimum.
77
        //
78
        // See: https://www.rabbitmq.com/reliability.html#consumer
79
        //  > [..] messages can be duplicated, and consumers must be prepared to handle them. If possible, ensure that
80
        //  > your consumers handle messages in an idempotent way rather than explicitly deal with deduplication.
81
82
        try {
83
            $this->claim($amqpMessage);
84
        } catch (AlreadyClaimedException $e) {
85
            $this->logAlreadyClaimedMessage($amqpMessage);
86
            // If we already processed it, we should still ack it here, otherwise rabbit will try to do it again, which
87
            // is exactly what we don't want.
88
            $this->acknowledgeMessage($amqpMessage);
89
            $this->terminateHandling();
90
            return;
91
        } catch (CanNotClaimException $e) {
92
            $this->logger->notice($e->getMessage());
93
            // if we don't have a message ID we can not claim it nor deduplicate it so
94
            // we should always process it, just to be sure we don't lose a message
95
        }
96
97
        try {
98
            $this->handler->handle($this->unpack($amqpMessage));
99
            $this->acknowledgeMessage($amqpMessage);
100
        } catch (AmqpMessageUnpackingException $e) {
101
            $this->logger->warning($e->getMessage());
102
            $this->persistenceClient->persistUndeliverableMessage($amqpMessage->body, $e->getMessage());
103
            $this->acknowledgeMessage($amqpMessage);
104
        } catch (UnrecoverablePersistenceException $e) {
105
            $this->logger->warning($e->getMessage());
106
            $this->acknowledgeMessage($amqpMessage);
107
            throw $e;
108
        } finally {
109
            $this->terminateHandling();
110
        }
111
        // We do not acknowledge the amqp msg when there was any other exception because if we could not put the msg
112
        // back in the scheduled queue, we would lose the message. This way it stays in rabbitMq and is tried again.
113
    }
114
115
    public function shutdownHandler(): void
116
    {
117
        if (!$this->isHandlingMessage()) {
118
            return;
119
        }
120
121
        $this->handler->shutdown();
122
123
        $this->acknowledgeMessage($this->amqpMessage);
124
    }
125
126
    private function registerShutdownHandler(): void
127
    {
128
        // The only way to catch (all) fatal errors is to have a shutdownHandler
129
        // This piece of code for example will not throw a Throwable, but just kill the php script:
130
        // ```
131
        //   $client = new SoapClient(null, ['location' => "http://localhost/soap.php", 'uri' => "http://test-uri/"]);
132
        //   $client->__setSoapHeaders([0 => null]);
133
        // ```
134
        //
135
        // But this is a big problem, because in that case we would not send an 'ACK' back to rabbit, as the fatal
136
        // error would just kill the php script, so rabbit would try again with the next available worker (as the
137
        // connection was hard interrupted).
138
        //
139
        // So if it fails again in the next worker with a fatal error again we get an endless retry loop in rabbit
140
        // which will basically break the queue, as nothing else is processed except for endlessly retrying the fatal.
141
        //
142
        // To remedy this we register the shutdown handler, we add the fatal error message to the command, and store
143
        // it in the queue to try again after the normal retry period, finally we send the ACK to rabbit, as we've
144
        // dealt with it properly (we will retry it in the future, and have logged the error nicely in the queue).
145
        if (!$this->hasRegisteredShutdownHandler) {
146
            register_shutdown_function([$this, 'shutdownHandler']);
147
            $this->hasRegisteredShutdownHandler = true;
148
        }
149
    }
150
151
    /**
152
     * @throws AlreadyClaimedException
153
     * @throws CanNotClaimException
154
     */
155
    private function claim(AMQPMessage $amqpMessage): void
156
    {
157
        if (!$amqpMessage->has('message_id')) {
158
            throw new CanNotClaimException(
159
                'Could not claim AMQP message because it does not have a message ID. Message body: '
160
                . $amqpMessage->body
161
            );
162
        }
163
164
        $cacheKey = 'msg_handled.' . $amqpMessage->get('message_id');
165
166
        if ($this->cache->has($cacheKey)) {
167
            throw new AlreadyClaimedException('AMQP message has already been claimed, with cache key ' . $cacheKey);
168
        }
169
170
        $this->cache->set($cacheKey, true, self::CLAIM_TTL);
171
    }
172
173
    private function acknowledgeMessage(AMQPMessage $amqpMessage): void
174
    {
175
        $channel = $amqpMessage->delivery_info['channel'];
176
        $channel->basic_ack($amqpMessage->delivery_info['delivery_tag']);
177
    }
178
179
    private function unpack(AMQPMessage $amqpMessage): MessageInterface
180
    {
181
        $result = @unserialize($amqpMessage->body);
182
183
        if ($result === false) {
184
            throw new AmqpMessageUnpackingException(self::UNPACKING_EXCEPTION_MSG_UNSERIALIZABLE_CONTENT);
185
        }
186
187
        if (! $result instanceof MessageInterface) {
188
            throw new AmqpMessageUnpackingException(self::UNPACKING_EXCEPTION_MSG_UNEXPECTED_TYPE);
189
        }
190
191
        return $result;
192
    }
193
194 View Code Duplication
    private function getLogMessage(MessageInterface $message): string
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
195
    {
196
        return $message->getId()
197
            . ', ' . $message->getDestination()
198
            . ', ' . $message->getDeliverAt()->format(DateTime::ATOM)
199
            . ', ' . $message->getCreatedAt()->format(DateTime::ATOM)
200
            . ', ' . $message->getUpdatedAt()->format(DateTime::ATOM)
201
            . ', ' . $message->getTries()
202
            . ', ' . $message->getPriority()
203
            . ', ' . $this->getPayloadDescription($message);
204
    }
205
206 View Code Duplication
    private function getPayloadDescription(MessageInterface $message): string
1 ignored issue
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
207
    {
208
        if (is_object($message->getPayload())) {
209
            return get_class($message->getPayload());
210
        }
211
212
        if (is_array($message->getPayload())) {
213
            return 'array';
214
        }
215
216
        return (string) $message->getPayload();
217
    }
218
219
    /**
220
     * @param AMQPMessage $amqpMessage
221
     *
222
     */
223
    private function logAlreadyClaimedMessage(AMQPMessage $amqpMessage): void
224
    {
225
        $queuedMessage = $this->unpack($amqpMessage);
226
227
        $message = 'Queue message was already claimed: ' . $this->getLogMessage($queuedMessage);
228
229
        $this->logger->notice($message);
230
    }
231
232
    private function initiateHandling(AMQPMessage $amqpMessage): void
233
    {
234
        $this->amqpMessage = $amqpMessage;
235
    }
236
237
    private function terminateHandling(): void
238
    {
239
        $this->amqpMessage = null;
240
    }
241
242
    public function isHandlingMessage(): bool
243
    {
244
        return $this->amqpMessage !== null;
245
    }
246
}
247