Test Failed
Push — ci-config ( 304611...6ac56a )
by Herberto
03:59
created

AmqpMessageHandler::getPayloadType()   A

Complexity

Conditions 3
Paths 3

Size

Total Lines 11
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 11
rs 9.4285
c 0
b 0
f 0
cc 3
eloc 5
nc 3
nop 1
1
<?php
2
3
namespace Werkspot\MessageQueue\DeliveryQueue\Amqp;
4
5
use DateTime;
6
use PhpAmqpLib\Message\AMQPMessage;
7
use Psr\Log\LoggerInterface;
8
use Psr\Log\NullLogger;
9
use Psr\SimpleCache\CacheInterface;
10
use Werkspot\MessageQueue\DeliveryQueue\Exception\AlreadyClaimedException;
11
use Werkspot\MessageQueue\DeliveryQueue\Exception\CanNotClaimException;
12
use Werkspot\MessageQueue\DeliveryQueue\Exception\UnrecoverablePersistenceException;
13
use Werkspot\MessageQueue\DeliveryQueue\MessageHandlerInterface;
14
use Werkspot\MessageQueue\DeliveryQueue\PersistenceClientInterface;
15
use Werkspot\MessageQueue\Message\MessageInterface;
16
17
final class AmqpMessageHandler implements AmqpMessageHandlerInterface
18
{
19
    const CLAIM_TTL = 3600;
20
    const UNPACKING_EXCEPTION_MSG_UNSERIALIZABLE_CONTENT = 'It was not possible to unserialize the AmqpMessage contents.';
21
    const UNPACKING_EXCEPTION_MSG_UNEXPECTED_TYPE = 'The AmqpMessage content is not a MessageQueue MessageInterface.';
22
23
    /**
24
     * @var MessageHandlerInterface
25
     */
26
    private $handler;
27
28
    /**
29
     * @var CacheInterface
30
     */
31
    private $cache;
32
33
    /**
34
     * @var LoggerInterface
35
     */
36
    private $logger;
37
38
    /**
39
     * @var PersistenceClientInterface
40
     */
41
    private $persistenceClient;
42
43
    /**
44
     * @var bool
45
     */
46
    private $hasRegisteredShutdownHandler = false;
47
48
    /**
49
     * @var AMQPMessage
50
     */
51
    private $amqpMessage;
52
53 7
    public function __construct(
54
        MessageHandlerInterface $handler,
55
        CacheInterface $cache,
56
        PersistenceClientInterface $persistenceClient,
57
        LoggerInterface $logger = null
58
    ) {
59 7
        $this->handler = $handler;
60 7
        $this->cache = $cache;
61 7
        $this->persistenceClient = $persistenceClient;
62 7
        $this->logger = $logger ?? new NullLogger();
63 7
    }
64
65 7
    public function handle(AMQPMessage $amqpMessage): void
66
    {
67 7
        $this->initiateHandling($amqpMessage);
68 7
        $this->registerShutdownHandler();
69
70
        // RabbitMQ can and WILL send duplicate messages to clients. You are recommended to handle message in an
71
        // idempotent way, but for us this is not really possible, because sending out emails is never idempotent.
72
        // So we are doing the next best thing, deduplicating them.
73
        //
74
        // When deduplicating them we should do it as early as possible and in the most performing way as possible to
75
        // prevent race conditions where 2 consumers get the message at near the same time. So make sure we deduplicate
76
        // as soon as possible to keep the duplicate executing at a minimum.
77
        //
78
        // See: https://www.rabbitmq.com/reliability.html#consumer
79
        //  > [..] messages can be duplicated, and consumers must be prepared to handle them. If possible, ensure that
80
        //  > your consumers handle messages in an idempotent way rather than explicitly deal with deduplication.
81
82
        try {
83 7
            $this->claim($amqpMessage);
84 2
        } catch (AlreadyClaimedException $e) {
85 1
            $this->logAlreadyClaimedMessage($amqpMessage);
86
            // If we already processed it, we should still ack it here, otherwise rabbit will try to do it again, which
87
            // is exactly what we don't want.
88 1
            $this->acknowledgeMessage($amqpMessage);
89 1
            $this->terminateHandling();
90 1
            return;
91 1
        } catch (CanNotClaimException $e) {
92 1
            $this->logger->notice($e->getMessage());
93
            // if we don't have a message ID we can not claim it nor deduplicate it so
94
            // we should always process it, just to be sure we don't lose a message
95
        }
96
97
        try {
98 6
            $this->handler->handle($this->unpack($amqpMessage));
99 1
            $this->acknowledgeMessage($amqpMessage);
100 5
        } catch (AmqpMessageUnpackingException $e) {
101 3
            $this->logger->warning($e->getMessage());
102 3
            $this->persistenceClient->persistUndeliverableMessage($amqpMessage->body, $e->getMessage());
103 3
            $this->acknowledgeMessage($amqpMessage);
104 2
        } catch (UnrecoverablePersistenceException $e) {
105 1
            $this->logger->warning($e->getMessage());
106 1
            $this->acknowledgeMessage($amqpMessage);
107 1
            throw $e;
108 4
        } finally {
109 6
            $this->terminateHandling();
110
        }
111
        // We do not acknowledge the amqp msg when there was any other exception because if we could not put the msg
112
        // back in the scheduled queue, we would lose the message. This way it stays in rabbitMq and is tried again.
113 4
    }
114
115
    public function shutdownHandler(): void
116
    {
117
        if (!$this->isHandlingMessage()) {
118
            return;
119
        }
120
121
        $this->handler->shutdown();
122
123
        $this->acknowledgeMessage($this->amqpMessage);
124
    }
125
126 7
    private function registerShutdownHandler(): void
127
    {
128
        // The only way to catch (all) fatal errors is to have a shutdownHandler
129
        // This piece of code for example will not throw a Throwable, but just kill the php script:
130
        // ```
131
        //   $client = new SoapClient(null, ['location' => "http://localhost/soap.php", 'uri' => "http://test-uri/"]);
132
        //   $client->__setSoapHeaders([0 => null]);
133
        // ```
134
        //
135
        // But this is a big problem, because in that case we would not send an 'ACK' back to rabbit, as the fatal
136
        // error would just kill the php script, so rabbit would try again with the next available worker (as the
137
        // connection was hard interrupted).
138
        //
139
        // So if it fails again in the next worker with a fatal error again we get an endless retry loop in rabbit
140
        // which will basically break the queue, as nothing else is processed except for endlessly retrying the fatal.
141
        //
142
        // To remedy this we register the shutdown handler, we add the fatal error message to the command, and store
143
        // it in the queue to try again after the normal retry period, finally we send the ACK to rabbit, as we've
144
        // dealt with it properly (we will retry it in the future, and have logged the error nicely in the queue).
145 7
        if (!$this->hasRegisteredShutdownHandler) {
146
            register_shutdown_function([$this, 'shutdownHandler']);
147
            $this->hasRegisteredShutdownHandler = true;
148
        }
149 7
    }
150
151
    /**
152
     * @throws AlreadyClaimedException
153
     * @throws CanNotClaimException
154
     */
155 7
    private function claim(AMQPMessage $amqpMessage): void
156
    {
157 7
        if (!$amqpMessage->has('message_id')) {
158 1
            throw new CanNotClaimException(
159
                'Could not claim AMQP message because it does not have a message ID. Message body: '
160 1
                . $amqpMessage->body
161
            );
162
        }
163
164 6
        $cacheKey = 'msg_handled.' . $amqpMessage->get('message_id');
165
166 6
        if ($this->cache->has($cacheKey)) {
167 1
            throw new AlreadyClaimedException('AMQP message has already been claimed, with cache key ' . $cacheKey);
168
        }
169
170 5
        $this->cache->set($cacheKey, true, self::CLAIM_TTL);
171 5
    }
172
173 6
    private function acknowledgeMessage(AMQPMessage $amqpMessage): void
174
    {
175 6
        $channel = $amqpMessage->delivery_info['channel'];
176 6
        $channel->basic_ack($amqpMessage->delivery_info['delivery_tag']);
177 6
    }
178
179 7
    private function unpack(AMQPMessage $amqpMessage): MessageInterface
180
    {
181 7
        $result = @unserialize($amqpMessage->body);
182
183 7
        if ($result === false) {
184 2
            throw new AmqpMessageUnpackingException(self::UNPACKING_EXCEPTION_MSG_UNSERIALIZABLE_CONTENT);
185
        }
186
187 5
        if (! $result instanceof MessageInterface) {
188 1
            throw new AmqpMessageUnpackingException(self::UNPACKING_EXCEPTION_MSG_UNEXPECTED_TYPE);
189
        }
190
191 4
        return $result;
192
    }
193
194 1
    private function getLogMessage(MessageInterface $message): string
195
    {
196 1
        return $message->getId()
197 1
            . ', ' . $message->getDestination()
198 1
            . ', ' . $message->getDeliverAt()->format(DateTime::ATOM)
199 1
            . ', ' . $message->getCreatedAt()->format(DateTime::ATOM)
200 1
            . ', ' . $message->getUpdatedAt()->format(DateTime::ATOM)
201 1
            . ', ' . $message->getTries()
202 1
            . ', ' . $message->getPriority()
203 1
            . ', ' . $this->getPayloadDescription($message);
204
    }
205
206 1
    private function getPayloadDescription(MessageInterface $message): string
207
    {
208 1
        if (is_object($message->getPayload())) {
209
            return get_class($message->getPayload());
210
        }
211
212 1
        if (is_array($message->getPayload())) {
213
            return 'array';
214
        }
215
216 1
        return (string) $message->getPayload();
217
    }
218
219
    /**
220
     * @param AMQPMessage $amqpMessage
221
     *
222
     */
223 1
    private function logAlreadyClaimedMessage(AMQPMessage $amqpMessage): void
224
    {
225 1
        $queuedMessage = $this->unpack($amqpMessage);
226
227 1
        $message = 'Queue message was already claimed: ' . $this->getLogMessage($queuedMessage);
228
229 1
        $this->logger->notice($message);
230 1
    }
231
232 7
    private function initiateHandling(AMQPMessage $amqpMessage): void
233
    {
234 7
        $this->amqpMessage = $amqpMessage;
235 7
    }
236
237 7
    private function terminateHandling(): void
238
    {
239 7
        $this->amqpMessage = null;
240 7
    }
241
242
    public function isHandlingMessage(): bool
243
    {
244
        return $this->amqpMessage !== null;
245
    }
246
}
247