1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace Equip\Queue; |
4
|
|
|
|
5
|
|
|
use Equip\Queue\Driver\DriverInterface; |
6
|
|
|
use Equip\Queue\Exception\HandlerException; |
7
|
|
|
use Equip\Queue\Serializer\JsonSerializer; |
8
|
|
|
use Equip\Queue\Serializer\MessageSerializerInterface; |
9
|
|
|
use Exception; |
10
|
|
|
use Psr\Log\LoggerInterface; |
11
|
|
|
|
12
|
|
|
class Worker |
13
|
|
|
{ |
14
|
|
|
/** |
15
|
|
|
* @var DriverInterface |
16
|
|
|
*/ |
17
|
|
|
private $driver; |
18
|
|
|
|
19
|
|
|
/** |
20
|
|
|
* @var Event |
21
|
|
|
*/ |
22
|
|
|
private $event; |
23
|
|
|
|
24
|
|
|
/** |
25
|
|
|
* @var LoggerInterface |
26
|
|
|
*/ |
27
|
|
|
private $logger; |
28
|
|
|
|
29
|
|
|
/** |
30
|
|
|
* @var MessageSerializerInterface |
31
|
|
|
*/ |
32
|
|
|
private $serializer; |
33
|
|
|
|
34
|
|
|
/** |
35
|
|
|
* @var array |
36
|
|
|
*/ |
37
|
|
|
private $handlers; |
38
|
|
|
|
39
|
|
|
/** |
40
|
|
|
* @param DriverInterface $driver |
41
|
|
|
* @param Event $event |
42
|
|
|
* @param LoggerInterface $logger |
43
|
|
|
* @param MessageSerializerInterface $serializer |
44
|
|
|
* @param array $handlers |
45
|
|
|
*/ |
46
|
|
|
public function __construct( |
47
|
|
|
DriverInterface $driver, |
48
|
|
|
Event $event, |
49
|
|
|
LoggerInterface $logger, |
50
|
|
|
MessageSerializerInterface $serializer = null, |
51
|
|
|
array $handlers = [] |
52
|
|
|
) { |
53
|
|
|
$this->driver = $driver; |
54
|
|
|
$this->event = $event; |
55
|
|
|
$this->logger = $logger; |
56
|
|
|
$this->serializer = $serializer ?: new JsonSerializer; |
57
|
|
|
$this->handlers = $handlers; |
58
|
|
|
} |
59
|
|
|
|
60
|
|
|
/** |
61
|
|
|
* Consumes messages off of the queue |
62
|
|
|
* |
63
|
|
|
* @param string $queue |
64
|
|
|
*/ |
65
|
|
|
public function consume($queue) |
66
|
|
|
{ |
67
|
|
|
while ($this->tick($queue)) { /* NOOP */ } |
|
|
|
|
68
|
|
|
} |
69
|
|
|
|
70
|
|
|
/** |
71
|
|
|
* Handles fetching messages from the queue |
72
|
|
|
* |
73
|
|
|
* @param string $queue |
74
|
|
|
* |
75
|
|
|
* @return bool |
76
|
|
|
*/ |
77
|
|
|
private function tick($queue) |
78
|
|
|
{ |
79
|
|
|
$packet = $this->driver->dequeue($queue); |
80
|
|
|
if (empty($packet)) { |
81
|
|
|
return true; |
82
|
|
|
} |
83
|
|
|
|
84
|
|
|
$message = $this->serializer->deserialize($packet); |
85
|
|
|
|
86
|
|
|
$handler = $this->getHandler($message->handler(), $this->handlers); |
87
|
|
|
if (!$handler) { |
88
|
|
|
$this->logger->warning(sprintf('Missing `%s` handler', $message->handler())); |
89
|
|
|
return true; |
90
|
|
|
} |
91
|
|
|
|
92
|
|
|
try { |
93
|
|
|
$this->jobStart($message); |
94
|
|
|
|
95
|
|
|
$result = call_user_func($handler, $message); |
96
|
|
|
|
97
|
|
|
$this->jobFinish($message); |
98
|
|
|
|
99
|
|
|
if ($result === false) { |
100
|
|
|
$this->jobShutdown($message); |
101
|
|
|
return false; |
102
|
|
|
} |
103
|
|
|
} catch (Exception $exception) { |
104
|
|
|
$this->jobException($message, $exception); |
105
|
|
|
} |
106
|
|
|
|
107
|
|
|
return true; |
108
|
|
|
} |
109
|
|
|
|
110
|
|
|
/** |
111
|
|
|
* @param string $handler |
112
|
|
|
* @param array $router |
113
|
|
|
* |
114
|
|
|
* @return null|callable |
115
|
|
|
* @throws HandlerException If handler is not callable |
116
|
|
|
*/ |
117
|
|
|
private function getHandler($handler, array $router = []) |
118
|
|
|
{ |
119
|
|
|
if (!isset($router[$handler])) { |
120
|
|
|
return null; |
121
|
|
|
} |
122
|
|
|
|
123
|
|
|
$callable = $router[$handler]; |
124
|
|
|
if (!is_callable($callable)) { |
125
|
|
|
throw HandlerException::invalidHandler($handler); |
126
|
|
|
} |
127
|
|
|
|
128
|
|
|
return $callable; |
129
|
|
|
} |
130
|
|
|
|
131
|
|
|
/** |
132
|
|
|
* Handles actions related to a job starting |
133
|
|
|
* |
134
|
|
|
* @param Message $message |
135
|
|
|
*/ |
136
|
|
|
private function jobStart(Message $message) |
137
|
|
|
{ |
138
|
|
|
$this->event->acknowledge($message); |
139
|
|
|
$this->logger->info(sprintf('`%s` job started', $message->handler())); |
140
|
|
|
} |
141
|
|
|
|
142
|
|
|
/** |
143
|
|
|
* Handles actions related to a job finishing |
144
|
|
|
* |
145
|
|
|
* @param Message $message |
146
|
|
|
*/ |
147
|
|
|
private function jobFinish(Message $message) |
148
|
|
|
{ |
149
|
|
|
$this->event->finish($message); |
150
|
|
|
$this->logger->info(sprintf('`%s` job finished', $message->handler())); |
151
|
|
|
} |
152
|
|
|
|
153
|
|
|
/** |
154
|
|
|
* Handles actions related to a job shutting down the consumer |
155
|
|
|
* |
156
|
|
|
* @param Message $message |
157
|
|
|
*/ |
158
|
|
|
private function jobShutdown(Message $message) |
159
|
|
|
{ |
160
|
|
|
$this->logger->notice(sprintf('shutting down by request of `%s`', $message->handler())); |
161
|
|
|
} |
162
|
|
|
|
163
|
|
|
/** |
164
|
|
|
* Handles actions related to job exceptions |
165
|
|
|
* |
166
|
|
|
* @param Message $message |
167
|
|
|
* @param Exception $exception |
168
|
|
|
*/ |
169
|
|
|
private function jobException(Message $message, Exception $exception) |
170
|
|
|
{ |
171
|
|
|
$this->logger->error($exception->getMessage()); |
172
|
|
|
$this->event->reject($message, $exception); |
173
|
|
|
} |
174
|
|
|
} |
175
|
|
|
|
This check looks for
while
loops that have no statements or where all statements have been commented out. This may be the result of changes for debugging or the code may simply be obsolete.Consider removing the loop.