1
|
|
|
<?php |
2
|
|
|
declare(strict_types=1); |
3
|
|
|
|
4
|
|
|
namespace Tomaj\Hermes; |
5
|
|
|
|
6
|
|
|
use Exception; |
7
|
|
|
use Psr\Log\LoggerInterface; |
8
|
|
|
use Psr\Log\LogLevel; |
9
|
|
|
use Tomaj\Hermes\Handler\HandlerInterface; |
10
|
|
|
use Tomaj\Hermes\Driver\DriverInterface; |
11
|
|
|
use Tomaj\Hermes\Restart\RestartException; |
12
|
|
|
use Tomaj\Hermes\Restart\RestartInterface; |
13
|
|
|
use Tracy\Debugger; |
14
|
|
|
use DateTime; |
15
|
|
|
|
16
|
|
|
class Dispatcher implements DispatcherInterface |
17
|
|
|
{ |
18
|
|
|
/** |
19
|
|
|
* Dispatcher driver |
20
|
|
|
* |
21
|
|
|
* @var DriverInterface |
22
|
|
|
*/ |
23
|
|
|
private $driver; |
24
|
|
|
|
25
|
|
|
/** |
26
|
|
|
* Logger |
27
|
|
|
* |
28
|
|
|
* @var LoggerInterface |
29
|
|
|
*/ |
30
|
|
|
private $logger; |
31
|
|
|
|
32
|
|
|
/** |
33
|
|
|
* Restart |
34
|
|
|
* |
35
|
|
|
* @var RestartInterface |
36
|
|
|
*/ |
37
|
|
|
private $restart; |
38
|
|
|
|
39
|
|
|
/** |
40
|
|
|
* All registered handlers |
41
|
|
|
* |
42
|
|
|
* @var HandlerInterface[][] |
43
|
|
|
*/ |
44
|
|
|
private $handlers = []; |
45
|
|
|
|
46
|
|
|
/** |
47
|
|
|
* @var DateTime |
48
|
|
|
*/ |
49
|
|
|
private $startTime; |
50
|
|
|
|
51
|
|
|
/** |
52
|
|
|
* Create new Dispatcher |
53
|
|
|
* |
54
|
|
|
* @param DriverInterface $driver |
55
|
|
|
* @param LoggerInterface $logger |
56
|
|
|
* @param RestartInterface $restart |
57
|
18 |
|
*/ |
58
|
|
|
public function __construct(DriverInterface $driver, LoggerInterface $logger = null, RestartInterface $restart = null) |
59
|
18 |
|
{ |
60
|
18 |
|
$this->driver = $driver; |
61
|
18 |
|
$this->logger = $logger; |
62
|
18 |
|
$this->restart = $restart; |
63
|
18 |
|
$this->startTime = new DateTime(); |
64
|
|
|
} |
65
|
|
|
|
66
|
|
|
/** |
67
|
|
|
* @deprecated - use Emitter::emit method intead |
68
|
|
|
*/ |
69
|
|
View Code Duplication |
public function emit(MessageInterface $message): DispatcherInterface |
|
|
|
|
70
|
|
|
{ |
71
|
|
|
$this->driver->send($message); |
72
|
|
|
|
73
|
|
|
$this->log( |
74
|
|
|
LogLevel::INFO, |
75
|
|
|
"Dispatcher send message #{$message->getId()} to driver " . get_class($this->driver), |
76
|
|
|
$this->messageLoggerContext($message) |
77
|
|
|
); |
78
|
|
|
return $this; |
79
|
|
|
} |
80
|
|
|
|
81
|
|
|
/** |
82
|
|
|
* Basic method for background job to star listening. |
83
|
|
|
* |
84
|
|
|
* This method hook to driver wait() method and start listening events. |
85
|
|
|
* Method is blocking, so when you call it all processing will stop. |
86
|
|
|
* WARNING! Don't use it on web server calls. Run it only with cli. |
87
|
|
|
* |
88
|
|
|
* @return void |
89
|
|
|
*/ |
90
|
|
|
public function handle(): void |
91
|
|
|
{ |
92
|
18 |
|
try { |
93
|
18 |
|
$this->driver->wait(function (MessageInterface $message) { |
94
|
18 |
|
$this->log( |
95
|
18 |
|
LogLevel::INFO, |
96
|
18 |
|
"Start handle message #{$message->getId()} ({$message->getType()})", |
97
|
12 |
|
$this->messageLoggerContext($message) |
98
|
|
|
); |
99
|
18 |
|
|
100
|
|
|
$result = $this->dispatch($message); |
101
|
18 |
|
|
102
|
3 |
|
if ($this->restart && $this->restart->shouldRestart($this->startTime)) { |
103
|
|
|
throw new RestartException('Restart'); |
104
|
|
|
} |
105
|
15 |
|
|
106
|
18 |
|
return $result; |
107
|
13 |
|
}); |
108
|
3 |
|
} catch (RestartException $e) { |
109
|
2 |
|
$this->log(LogLevel::NOTICE, 'Existing hermes dispatcher - restart'); |
110
|
|
|
} catch (Exception $exception) { |
111
|
|
|
if (Debugger::isEnabled()) { |
112
|
18 |
|
Debugger::log($exception, Debugger::EXCEPTION); |
113
|
|
|
} |
114
|
|
|
} |
115
|
|
|
} |
116
|
|
|
|
117
|
|
|
/** |
118
|
|
|
* Dispatch message |
119
|
|
|
* |
120
|
|
|
* @param MessageInterface $message |
121
|
18 |
|
* |
122
|
|
|
* @return bool |
123
|
18 |
|
*/ |
124
|
|
|
private function dispatch(MessageInterface $message): bool |
125
|
18 |
|
{ |
126
|
6 |
|
$type = $message->getType(); |
127
|
|
|
|
128
|
|
|
if (!$this->hasHandlers($type)) { |
129
|
15 |
|
return true; |
130
|
|
|
} |
131
|
15 |
|
|
132
|
15 |
|
$result = true; |
133
|
|
|
|
134
|
15 |
|
foreach ($this->handlers[$type] as $handler) { |
135
|
6 |
|
$handlerResult = $this->handleMessage($handler, $message); |
136
|
4 |
|
|
137
|
10 |
|
if ($result && !$handlerResult) { |
138
|
|
|
$result = false; |
139
|
15 |
|
} |
140
|
|
|
} |
141
|
|
|
|
142
|
|
|
return $result; |
143
|
|
|
} |
144
|
|
|
|
145
|
|
|
/** |
146
|
|
|
* Handle given message with given handler |
147
|
|
|
* |
148
|
|
|
* @param HandlerInterface $handler |
149
|
|
|
* @param MessageInterface $message |
150
|
15 |
|
* |
151
|
|
|
* @return bool |
152
|
|
|
*/ |
153
|
15 |
|
private function handleMessage(HandlerInterface $handler, MessageInterface $message): bool |
154
|
|
|
{ |
155
|
|
|
// check if handler implements Psr\Log\LoggerAwareInterface (you can use \Psr\Log\LoggerAwareTrait) |
156
|
|
|
if ($this->logger && method_exists($handler, 'setLogger')) { |
157
|
|
|
$handler->setLogger($this->logger); |
|
|
|
|
158
|
15 |
|
} |
159
|
|
|
|
160
|
12 |
|
try { |
161
|
12 |
|
$result = $handler->handle($message); |
162
|
12 |
|
|
163
|
12 |
|
$this->log( |
164
|
8 |
|
LogLevel::INFO, |
165
|
11 |
|
"End handle message #{$message->getId()} ({$message->getType()})", |
166
|
3 |
|
$this->messageLoggerContext($message) |
167
|
3 |
|
); |
168
|
3 |
|
} catch (Exception $e) { |
169
|
3 |
|
$this->log( |
170
|
2 |
|
LogLevel::ERROR, |
171
|
3 |
|
"Handler " . get_class($handler) . " throws exception - {$e->getMessage()}", |
172
|
3 |
|
['error' => $e, 'message' => $this->messageLoggerContext($message), 'exception' => $e] |
173
|
|
|
); |
174
|
15 |
|
if (Debugger::isEnabled()) { |
175
|
|
|
Debugger::log($e, Debugger::EXCEPTION); |
176
|
|
|
} |
177
|
|
|
|
178
|
|
|
if (method_exists($handler, 'canRetry')) { |
179
|
|
|
if ($message->getRetries() < $handler->maxRetry()) { |
180
|
|
|
$executeAt = $this->nextRetry($message); |
181
|
|
|
$newMessage = new Message($message->getType(), $message->getPayload(), $message->getId(), $message->getCreated(), $executeAt, $message->getRetries() + 1); |
182
|
|
|
$this->driver->send($newMessage); |
183
|
|
|
} |
184
|
18 |
|
} |
185
|
|
|
|
186
|
18 |
|
$result = false; |
187
|
|
|
} |
188
|
|
|
return $result; |
189
|
|
|
} |
190
|
|
|
|
191
|
|
|
/** |
192
|
18 |
|
* Calculate next retry |
193
|
|
|
* |
194
|
18 |
|
* Inspired by ruby sidekiq (https://github.com/mperham/sidekiq/wiki/Error-Handling#automatic-job-retry) |
195
|
18 |
|
* |
196
|
12 |
|
* @param MessageInterface $message |
197
|
|
|
* @return float |
198
|
18 |
|
*/ |
199
|
18 |
|
private function nextRetry(MessageInterface $message): float |
200
|
|
|
{ |
201
|
|
|
return microtime(true) + pow($message->getRetries(), 4) + 15 + (rand(1, 30) * ($message->getRetries() + 1)); |
202
|
|
|
} |
203
|
|
|
|
204
|
|
|
/** |
205
|
|
|
* Check if actual dispatcher has handler for given type |
206
|
|
|
* |
207
|
|
|
* @param string $type |
208
|
18 |
|
* |
209
|
|
|
* @return bool |
210
|
|
|
*/ |
211
|
18 |
|
private function hasHandlers(string $type): bool |
212
|
18 |
|
{ |
213
|
18 |
|
return isset($this->handlers[$type]) && count($this->handlers[$type]) > 0; |
214
|
18 |
|
} |
215
|
12 |
|
|
216
|
|
|
/** |
217
|
|
|
* {@inheritdoc} |
218
|
|
|
*/ |
219
|
|
|
public function registerHandler(string $type, HandlerInterface $handler): DispatcherInterface |
220
|
|
|
{ |
221
|
|
|
if (!isset($this->handlers[$type])) { |
222
|
|
|
$this->handlers[$type] = []; |
223
|
|
|
} |
224
|
|
|
|
225
|
|
|
$this->handlers[$type][] = $handler; |
226
|
|
|
return $this; |
227
|
18 |
|
} |
228
|
|
|
|
229
|
18 |
|
/** |
230
|
3 |
|
* Serialize message to logger context |
231
|
2 |
|
* |
232
|
18 |
|
* @param MessageInterface $message |
233
|
|
|
* |
234
|
|
|
* @return array |
235
|
|
|
*/ |
236
|
|
View Code Duplication |
private function messageLoggerContext(MessageInterface $message): array |
|
|
|
|
237
|
|
|
{ |
238
|
|
|
return [ |
239
|
|
|
'id' => $message->getId(), |
240
|
|
|
'created' => $message->getCreated(), |
241
|
|
|
'type' => $message->getType(), |
242
|
|
|
'payload' => $message->getPayload(), |
243
|
|
|
]; |
244
|
|
|
} |
245
|
|
|
|
246
|
|
|
/** |
247
|
|
|
* Interal log method wrapper |
248
|
|
|
* |
249
|
|
|
* @param mixed $level |
250
|
|
|
* @param string $message |
251
|
|
|
* @param array $context |
252
|
|
|
* |
253
|
|
|
* @return void |
254
|
|
|
*/ |
255
|
|
|
private function log($level, string $message, array $context = array()): void |
256
|
|
|
{ |
257
|
|
|
if ($this->logger) { |
258
|
|
|
$this->logger->log($level, $message, $context); |
259
|
|
|
} |
260
|
|
|
} |
261
|
|
|
} |
262
|
|
|
|
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.
You can also find more detailed suggestions in the “Code” section of your repository.