1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace Werkspot\MessageQueue\DeliveryQueue\Amqp; |
4
|
|
|
|
5
|
|
|
use DateTime; |
6
|
|
|
use PhpAmqpLib\Message\AMQPMessage; |
7
|
|
|
use Psr\Log\LoggerInterface; |
8
|
|
|
use Psr\Log\NullLogger; |
9
|
|
|
use Psr\SimpleCache\CacheInterface; |
10
|
|
|
use Werkspot\MessageQueue\DeliveryQueue\Exception\AlreadyClaimedException; |
11
|
|
|
use Werkspot\MessageQueue\DeliveryQueue\Exception\CanNotClaimException; |
12
|
|
|
use Werkspot\MessageQueue\DeliveryQueue\Exception\UnrecoverablePersistenceException; |
13
|
|
|
use Werkspot\MessageQueue\DeliveryQueue\MessageHandlerInterface; |
14
|
|
|
use Werkspot\MessageQueue\DeliveryQueue\PersistenceClientInterface; |
15
|
|
|
use Werkspot\MessageQueue\Message\MessageInterface; |
16
|
|
|
|
17
|
|
|
final class AmqpMessageHandler implements AmqpMessageHandlerInterface |
18
|
|
|
{ |
19
|
|
|
const CLAIM_TTL = 3600; |
20
|
|
|
const UNPACKING_EXCEPTION_MSG_UNSERIALIZABLE_CONTENT = 'It was not possible to unserialize the AmqpMessage contents.'; |
21
|
|
|
const UNPACKING_EXCEPTION_MSG_UNEXPECTED_TYPE = 'The AmqpMessage content is not a MessageQueue MessageInterface.'; |
22
|
|
|
|
23
|
|
|
/** |
24
|
|
|
* @var MessageHandlerInterface |
25
|
|
|
*/ |
26
|
|
|
private $handler; |
27
|
|
|
|
28
|
|
|
/** |
29
|
|
|
* @var CacheInterface |
30
|
|
|
*/ |
31
|
|
|
private $cache; |
32
|
|
|
|
33
|
|
|
/** |
34
|
|
|
* @var LoggerInterface |
35
|
|
|
*/ |
36
|
|
|
private $logger; |
37
|
|
|
|
38
|
|
|
/** |
39
|
|
|
* @var PersistenceClientInterface |
40
|
|
|
*/ |
41
|
|
|
private $persistenceClient; |
42
|
|
|
|
43
|
|
|
/** |
44
|
|
|
* @var bool |
45
|
|
|
*/ |
46
|
|
|
private $hasRegisteredShutdownHandler = false; |
47
|
|
|
|
48
|
|
|
/** |
49
|
|
|
* @var AMQPMessage |
50
|
|
|
*/ |
51
|
|
|
private $amqpMessage; |
52
|
|
|
|
53
|
7 |
|
public function __construct( |
54
|
|
|
MessageHandlerInterface $handler, |
55
|
|
|
CacheInterface $cache, |
56
|
|
|
PersistenceClientInterface $persistenceClient, |
57
|
|
|
LoggerInterface $logger = null |
58
|
|
|
) { |
59
|
7 |
|
$this->handler = $handler; |
60
|
7 |
|
$this->cache = $cache; |
61
|
7 |
|
$this->persistenceClient = $persistenceClient; |
62
|
7 |
|
$this->logger = $logger ?? new NullLogger(); |
63
|
7 |
|
} |
64
|
|
|
|
65
|
7 |
|
public function handle(AMQPMessage $amqpMessage): void |
66
|
|
|
{ |
67
|
7 |
|
$this->initiateHandling($amqpMessage); |
68
|
7 |
|
$this->registerShutdownHandler(); |
69
|
|
|
|
70
|
|
|
// RabbitMQ can and WILL send duplicate messages to clients. You are recommended to handle message in an |
71
|
|
|
// idempotent way, but for us this is not really possible, because sending out emails is never idempotent. |
72
|
|
|
// So we are doing the next best thing, deduplicating them. |
73
|
|
|
// |
74
|
|
|
// When deduplicating them we should do it as early as possible and in the most performing way as possible to |
75
|
|
|
// prevent race conditions where 2 consumers get the message at near the same time. So make sure we deduplicate |
76
|
|
|
// as soon as possible to keep the duplicate executing at a minimum. |
77
|
|
|
// |
78
|
|
|
// See: https://www.rabbitmq.com/reliability.html#consumer |
79
|
|
|
// > [..] messages can be duplicated, and consumers must be prepared to handle them. If possible, ensure that |
80
|
|
|
// > your consumers handle messages in an idempotent way rather than explicitly deal with deduplication. |
81
|
|
|
|
82
|
|
|
try { |
83
|
7 |
|
$this->claim($amqpMessage); |
84
|
2 |
|
} catch (AlreadyClaimedException $e) { |
85
|
1 |
|
$this->logAlreadyClaimedMessage($amqpMessage); |
86
|
|
|
// If we already processed it, we should still ack it here, otherwise rabbit will try to do it again, which |
87
|
|
|
// is exactly what we don't want. |
88
|
1 |
|
$this->acknowledgeMessage($amqpMessage); |
89
|
1 |
|
$this->terminateHandling(); |
90
|
1 |
|
return; |
91
|
1 |
|
} catch (CanNotClaimException $e) { |
92
|
1 |
|
$this->logger->notice($e->getMessage()); |
93
|
|
|
// if we don't have a message ID we can not claim it nor deduplicate it so |
94
|
|
|
// we should always process it, just to be sure we don't lose a message |
95
|
|
|
} |
96
|
|
|
|
97
|
|
|
try { |
98
|
6 |
|
$this->handler->handle($this->unpack($amqpMessage)); |
99
|
1 |
|
$this->acknowledgeMessage($amqpMessage); |
100
|
5 |
|
} catch (AmqpMessageUnpackingException $e) { |
101
|
3 |
|
$this->logger->warning($e->getMessage()); |
102
|
3 |
|
$this->persistenceClient->persistUndeliverableMessage($amqpMessage->body, $e->getMessage()); |
103
|
3 |
|
$this->acknowledgeMessage($amqpMessage); |
104
|
2 |
|
} catch (UnrecoverablePersistenceException $e) { |
105
|
1 |
|
$this->logger->warning($e->getMessage()); |
106
|
1 |
|
$this->acknowledgeMessage($amqpMessage); |
107
|
1 |
|
throw $e; |
108
|
4 |
|
} finally { |
109
|
6 |
|
$this->terminateHandling(); |
110
|
|
|
} |
111
|
|
|
// We do not acknowledge the amqp msg when there was any other exception because if we could not put the msg |
112
|
|
|
// back in the scheduled queue, we would lose the message. This way it stays in rabbitMq and is tried again. |
113
|
4 |
|
} |
114
|
|
|
|
115
|
|
|
public function shutdownHandler(): void |
116
|
|
|
{ |
117
|
|
|
if (!$this->isHandlingMessage()) { |
118
|
|
|
return; |
119
|
|
|
} |
120
|
|
|
|
121
|
|
|
$this->handler->shutdown(); |
122
|
|
|
|
123
|
|
|
$this->acknowledgeMessage($this->amqpMessage); |
124
|
|
|
} |
125
|
|
|
|
126
|
7 |
|
private function registerShutdownHandler(): void |
127
|
|
|
{ |
128
|
|
|
// The only way to catch (all) fatal errors is to have a shutdownHandler |
129
|
|
|
// This piece of code for example will not throw a Throwable, but just kill the php script: |
130
|
|
|
// ``` |
131
|
|
|
// $client = new SoapClient(null, ['location' => "http://localhost/soap.php", 'uri' => "http://test-uri/"]); |
132
|
|
|
// $client->__setSoapHeaders([0 => null]); |
133
|
|
|
// ``` |
134
|
|
|
// |
135
|
|
|
// But this is a big problem, because in that case we would not send an 'ACK' back to rabbit, as the fatal |
136
|
|
|
// error would just kill the php script, so rabbit would try again with the next available worker (as the |
137
|
|
|
// connection was hard interrupted). |
138
|
|
|
// |
139
|
|
|
// So if it fails again in the next worker with a fatal error again we get an endless retry loop in rabbit |
140
|
|
|
// which will basically break the queue, as nothing else is processed except for endlessly retrying the fatal. |
141
|
|
|
// |
142
|
|
|
// To remedy this we register the shutdown handler, we add the fatal error message to the command, and store |
143
|
|
|
// it in the queue to try again after the normal retry period, finally we send the ACK to rabbit, as we've |
144
|
|
|
// dealt with it properly (we will retry it in the future, and have logged the error nicely in the queue). |
145
|
7 |
|
if (!$this->hasRegisteredShutdownHandler) { |
146
|
|
|
register_shutdown_function([$this, 'shutdownHandler']); |
147
|
|
|
$this->hasRegisteredShutdownHandler = true; |
148
|
|
|
} |
149
|
7 |
|
} |
150
|
|
|
|
151
|
|
|
/** |
152
|
|
|
* @throws AlreadyClaimedException |
153
|
|
|
* @throws CanNotClaimException |
154
|
|
|
*/ |
155
|
7 |
|
private function claim(AMQPMessage $amqpMessage): void |
156
|
|
|
{ |
157
|
7 |
|
if (!$amqpMessage->has('message_id')) { |
158
|
1 |
|
throw new CanNotClaimException( |
159
|
|
|
'Could not claim AMQP message because it does not have a message ID. Message body: ' |
160
|
1 |
|
. $amqpMessage->body |
161
|
|
|
); |
162
|
|
|
} |
163
|
|
|
|
164
|
6 |
|
$cacheKey = 'msg_handled.' . $amqpMessage->get('message_id'); |
165
|
|
|
|
166
|
6 |
|
if ($this->cache->has($cacheKey)) { |
167
|
1 |
|
throw new AlreadyClaimedException('AMQP message has already been claimed, with cache key ' . $cacheKey); |
168
|
|
|
} |
169
|
|
|
|
170
|
5 |
|
$this->cache->set($cacheKey, true, self::CLAIM_TTL); |
171
|
5 |
|
} |
172
|
|
|
|
173
|
6 |
|
private function acknowledgeMessage(AMQPMessage $amqpMessage): void |
174
|
|
|
{ |
175
|
6 |
|
$channel = $amqpMessage->delivery_info['channel']; |
176
|
6 |
|
$channel->basic_ack($amqpMessage->delivery_info['delivery_tag']); |
177
|
6 |
|
} |
178
|
|
|
|
179
|
7 |
|
private function unpack(AMQPMessage $amqpMessage): MessageInterface |
180
|
|
|
{ |
181
|
7 |
|
$result = @unserialize($amqpMessage->body); |
182
|
|
|
|
183
|
7 |
|
if ($result === false) { |
184
|
2 |
|
throw new AmqpMessageUnpackingException(self::UNPACKING_EXCEPTION_MSG_UNSERIALIZABLE_CONTENT); |
185
|
|
|
} |
186
|
|
|
|
187
|
5 |
|
if (! $result instanceof MessageInterface) { |
188
|
1 |
|
throw new AmqpMessageUnpackingException(self::UNPACKING_EXCEPTION_MSG_UNEXPECTED_TYPE); |
189
|
|
|
} |
190
|
|
|
|
191
|
4 |
|
return $result; |
192
|
|
|
} |
193
|
|
|
|
194
|
1 |
View Code Duplication |
private function getLogMessage(MessageInterface $message): string |
|
|
|
|
195
|
|
|
{ |
196
|
1 |
|
return $message->getId() |
197
|
1 |
|
. ', ' . $message->getDestination() |
198
|
1 |
|
. ', ' . $message->getDeliverAt()->format(DateTime::ATOM) |
199
|
1 |
|
. ', ' . $message->getCreatedAt()->format(DateTime::ATOM) |
200
|
1 |
|
. ', ' . $message->getUpdatedAt()->format(DateTime::ATOM) |
201
|
1 |
|
. ', ' . $message->getTries() |
202
|
1 |
|
. ', ' . $message->getPriority() |
203
|
1 |
|
. ', ' . $this->getPayloadDescription($message); |
204
|
|
|
} |
205
|
|
|
|
206
|
1 |
View Code Duplication |
private function getPayloadDescription(MessageInterface $message): string |
207
|
|
|
{ |
208
|
1 |
|
if (is_object($message->getPayload())) { |
209
|
|
|
return get_class($message->getPayload()); |
210
|
|
|
} |
211
|
|
|
|
212
|
1 |
|
if (is_array($message->getPayload())) { |
213
|
|
|
return 'array'; |
214
|
|
|
} |
215
|
|
|
|
216
|
1 |
|
return (string) $message->getPayload(); |
217
|
|
|
} |
218
|
|
|
|
219
|
|
|
/** |
220
|
|
|
* @param AMQPMessage $amqpMessage |
221
|
|
|
* |
222
|
|
|
*/ |
223
|
1 |
|
private function logAlreadyClaimedMessage(AMQPMessage $amqpMessage): void |
224
|
|
|
{ |
225
|
1 |
|
$queuedMessage = $this->unpack($amqpMessage); |
226
|
|
|
|
227
|
1 |
|
$message = 'Queue message was already claimed: ' . $this->getLogMessage($queuedMessage); |
228
|
|
|
|
229
|
1 |
|
$this->logger->notice($message); |
230
|
1 |
|
} |
231
|
|
|
|
232
|
7 |
|
private function initiateHandling(AMQPMessage $amqpMessage): void |
233
|
|
|
{ |
234
|
7 |
|
$this->amqpMessage = $amqpMessage; |
235
|
7 |
|
} |
236
|
|
|
|
237
|
7 |
|
private function terminateHandling(): void |
238
|
|
|
{ |
239
|
7 |
|
$this->amqpMessage = null; |
240
|
7 |
|
} |
241
|
|
|
|
242
|
|
|
public function isHandlingMessage(): bool |
243
|
|
|
{ |
244
|
|
|
return $this->amqpMessage !== null; |
245
|
|
|
} |
246
|
|
|
} |
247
|
|
|
|
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.
You can also find more detailed suggestions in the “Code” section of your repository.