1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
declare(strict_types=1); |
4
|
|
|
|
5
|
|
|
namespace Werkspot\MessageQueue\DeliveryQueue; |
6
|
|
|
|
7
|
|
|
use DateTime; |
8
|
|
|
use ErrorException; |
9
|
|
|
use Exception; |
10
|
|
|
use Psr\Log\LoggerInterface; |
11
|
|
|
use Psr\Log\NullLogger; |
12
|
|
|
use Throwable; |
13
|
|
|
use Werkspot\MessageQueue\Message\MessageInterface; |
14
|
|
|
|
15
|
|
|
final class MessageHandler implements MessageHandlerInterface |
16
|
|
|
{ |
17
|
|
|
/** |
18
|
|
|
* @var MessageDeliveryServiceInterface |
19
|
|
|
*/ |
20
|
|
|
private $messageDeliveryService; |
21
|
|
|
|
22
|
|
|
/** |
23
|
|
|
* @var PersistenceClientInterface |
24
|
|
|
*/ |
25
|
|
|
private $persistenceClient; |
26
|
|
|
|
27
|
|
|
/** |
28
|
|
|
* @var LoggerInterface |
29
|
|
|
*/ |
30
|
|
|
private $logger; |
31
|
|
|
|
32
|
|
|
/** |
33
|
|
|
* @var MessageInterface |
34
|
|
|
*/ |
35
|
|
|
private $message; |
36
|
|
|
|
37
|
4 |
|
public function __construct( |
38
|
|
|
MessageDeliveryServiceInterface $messageDeliveryService, |
39
|
|
|
PersistenceClientInterface $persistenceClient, |
40
|
|
|
LoggerInterface $logger = null |
41
|
|
|
) { |
42
|
4 |
|
$this->messageDeliveryService = $messageDeliveryService; |
43
|
4 |
|
$this->persistenceClient = $persistenceClient; |
44
|
4 |
|
$this->logger = $logger ?? new NullLogger(); |
45
|
4 |
|
} |
46
|
|
|
|
47
|
4 |
|
public function handle(MessageInterface $message): void |
48
|
|
|
{ |
49
|
|
|
// There can be an edge case where we get a fatal/exception, and we go out of this flow and keep handlingMessage |
50
|
|
|
// on true, but in that case the flow is broken anyway, as handle() should normally handle all cases sanely. |
51
|
|
|
// If we're in the shutdown handler, the script is being killed anyway. So we don't care anymore. |
52
|
4 |
|
$this->initiateHandling($message); |
53
|
|
|
|
54
|
|
|
try { |
55
|
4 |
|
$this->process(); |
56
|
3 |
|
} catch (Throwable $e) { |
57
|
3 |
|
$this->onError($e); |
58
|
2 |
|
} finally { |
59
|
4 |
|
$this->terminateHandling(); |
60
|
|
|
// We clear the persistenceClient (entity manager), to ensure each message is handled without stale entities. |
61
|
|
|
// Because the workers run for a long time, and we have multiple workers running, we need to prevent |
62
|
|
|
// having old entities cached in memory. When other workers already updated the entity to a new state. |
63
|
|
|
// Otherwise we would use the old, out-dated state, and overwrite and stuff happened after it in the other |
64
|
|
|
// workers. |
65
|
4 |
|
$this->persistenceClient->reset(); |
66
|
|
|
} |
67
|
2 |
|
} |
68
|
|
|
|
69
|
2 |
|
public function isHandlingMessage(): bool |
70
|
|
|
{ |
71
|
2 |
|
return $this->message !== null; |
72
|
|
|
} |
73
|
|
|
|
74
|
|
|
public function shutdown(): void |
75
|
|
|
{ |
76
|
|
|
$error = error_get_last(); |
77
|
|
|
|
78
|
|
|
if ($error !== null && $this->isHandlingMessage()) { |
79
|
|
|
$this->onError(new ErrorException($error['message'], 0, E_ERROR, $error['file'], $error['line'])); |
80
|
|
|
} |
81
|
|
|
} |
82
|
|
|
|
83
|
3 |
|
public function onError(Throwable $error): void |
84
|
|
|
{ |
85
|
|
|
// If we have a fatal error, we were not able to rollback the transaction, so we will need to do it here, |
86
|
|
|
// otherwise we cannot queue the command below, as it will be in a transaction that will never be committed |
87
|
3 |
|
$this->persistenceClient->rollbackTransaction(); |
88
|
|
|
|
89
|
3 |
|
if (!is_subclass_of($this->message, MessageInterface::class)) { |
90
|
|
|
// This happens when we have a serialization problem in the queue for whatever reason (e.g. a class |
91
|
|
|
// was moved, or renamed). In that case it's not a Command, but an PHP_Incomplete_Class. |
92
|
|
|
// If we'd call handleFailure() for an Incomplete_Class we'd get a fatal error, this is not something we |
93
|
|
|
// want, because the worker will die. And although supervisor will restart it again, it's not good, |
94
|
|
|
// as supervisor will only restart it for an x amount of tries, before giving up, and we end up with |
95
|
|
|
// a queue that is stuck. |
96
|
|
|
// So in this case we can't log the error with the current logic. It's on the todo to just do an update |
97
|
|
|
// in the database directly here, e.g. 'update QueuedCommand set error = ... where id = .. ' but we are |
98
|
|
|
// working on the project restructure with a hard deadline and want to prioritize that above this. |
99
|
|
|
// At least the fix is there, just not the logging, which we'll do later. |
100
|
|
|
$logMessage = sprintf( |
101
|
|
|
'Cannot deliver queued message because it is not a MessageInterface %s: %s', |
102
|
|
|
$this->message->getId(), |
103
|
|
|
$error->getMessage() |
104
|
|
|
); |
105
|
|
|
|
106
|
|
|
$this->logger->error($logMessage); |
107
|
|
|
$this->message->fail($error); |
108
|
|
|
$this->persistenceClient->persistUndeliverableMessage($this->message, $logMessage); |
109
|
|
|
} else { |
110
|
3 |
|
$this->message->fail($error); |
111
|
|
|
|
112
|
|
|
try { |
113
|
3 |
|
$this->persistenceClient->persist($this->message); |
114
|
2 |
|
} catch (Exception $exception) { |
115
|
|
|
// It can happen that we have some problem with persisting most likely a uniqid constraint because of a |
116
|
|
|
// race condition or if we processed a message twice. In that case persist it in the |
117
|
|
|
// UnRequeueableMessage table so you can do monitoring on this and fix it. |
118
|
2 |
|
$this->persistenceClient->persistUndeliverableMessage($this->message, $exception->getMessage()); |
119
|
|
|
|
120
|
1 |
|
throw $exception; |
121
|
|
|
} |
122
|
|
|
} |
123
|
1 |
|
} |
124
|
|
|
|
125
|
4 |
|
private function process(): void |
126
|
|
|
{ |
127
|
4 |
|
$this->logger->info('Delivering message: ' . $this->getLogMessage($this->message)); |
128
|
|
|
|
129
|
4 |
|
$this->messageDeliveryService->deliver($this->message); |
130
|
1 |
|
} |
131
|
|
|
|
132
|
4 |
View Code Duplication |
private function getLogMessage(MessageInterface $message): string |
|
|
|
|
133
|
|
|
{ |
134
|
4 |
|
return $message->getId() |
135
|
4 |
|
. ', ' . $message->getDestination() |
136
|
4 |
|
. ', ' . $message->getDeliverAt()->format(DateTime::ATOM) |
137
|
4 |
|
. ', ' . $message->getCreatedAt()->format(DateTime::ATOM) |
138
|
4 |
|
. ', ' . $message->getUpdatedAt()->format(DateTime::ATOM) |
139
|
4 |
|
. ', ' . $message->getTries() |
140
|
4 |
|
. ', ' . $message->getPriority() |
141
|
4 |
|
. ', ' . $this->getPayloadDescription($message); |
142
|
|
|
} |
143
|
|
|
|
144
|
4 |
View Code Duplication |
private function getPayloadDescription(MessageInterface $message): string |
|
|
|
|
145
|
|
|
{ |
146
|
4 |
|
if (is_object($message->getPayload())) { |
147
|
|
|
return get_class($message->getPayload()); |
148
|
|
|
} |
149
|
|
|
|
150
|
4 |
|
if (is_array($message->getPayload())) { |
151
|
|
|
return 'array'; |
152
|
|
|
} |
153
|
|
|
|
154
|
4 |
|
return (string) $message->getPayload(); |
155
|
|
|
} |
156
|
|
|
|
157
|
4 |
|
private function initiateHandling(MessageInterface $message): void |
158
|
|
|
{ |
159
|
4 |
|
$this->message = $message; |
160
|
4 |
|
} |
161
|
|
|
|
162
|
4 |
|
private function terminateHandling(): void |
163
|
|
|
{ |
164
|
4 |
|
$this->message = null; |
165
|
4 |
|
} |
166
|
|
|
} |
167
|
|
|
|
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.
You can also find more detailed suggestions in the “Code” section of your repository.