1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
declare(strict_types=1); |
4
|
|
|
|
5
|
|
|
namespace Resque; |
6
|
|
|
|
7
|
|
|
use Psr\Container\ContainerInterface; |
8
|
|
|
use Resque\Dispatchers\Noop; |
9
|
|
|
use Resque\Interfaces\Dispatcher; |
10
|
|
|
use Resque\Interfaces\Serializer; |
11
|
|
|
use Resque\Tasks\ForkFailed; |
12
|
|
|
use Resque\Tasks\JobFailed; |
13
|
|
|
use Resque\Tasks\ParentWaiting; |
14
|
|
|
use Resque\Tasks\UnknownChildFailure; |
15
|
|
|
use Resque\Tasks\WorkerDoneWorking; |
16
|
|
|
use Resque\Tasks\WorkerIdle; |
17
|
|
|
use Resque\Tasks\WorkerRegistering; |
18
|
|
|
use Resque\Tasks\WorkerStartup; |
19
|
|
|
use Resque\Tasks\WorkerUnregistering; |
20
|
|
|
|
21
|
|
|
class Worker |
22
|
|
|
{ |
23
|
|
|
private const FORK_FAILED = -1; |
24
|
|
|
private const FORK_CHILD = 0; |
25
|
|
|
|
26
|
|
|
private $datastore; |
27
|
|
|
private $serializer; |
28
|
|
|
private $queueNames = ['default']; |
29
|
|
|
private $interval = 10; |
30
|
|
|
private $shouldShutdown = false; |
31
|
|
|
private $isPaused = false; |
32
|
|
|
private $isChildThread = false; |
33
|
|
|
private $childId = 0; |
34
|
|
|
private $serviceLocator; |
35
|
|
|
private $dispatcher = null; |
36
|
|
|
private $id; |
37
|
|
|
private $signalHandler; |
38
|
|
|
|
39
|
13 |
|
public function __construct( |
40
|
|
|
Datastore $datastore, |
41
|
|
|
Serializer $serializer, |
42
|
|
|
ContainerInterface $serviceLocator, |
43
|
|
|
SignalHandler $signalHandler |
44
|
|
|
) { |
45
|
13 |
|
$this->datastore = $datastore; |
46
|
13 |
|
$this->serializer = $serializer; |
47
|
13 |
|
$this->serviceLocator = $serviceLocator; |
48
|
13 |
|
$this->signalHandler = $signalHandler; |
49
|
13 |
|
$this->dispatcher = new Noop(); |
50
|
|
|
|
51
|
13 |
|
$this->id = gethostname() . '-' . getmypid() . md5(random_bytes(2)); |
52
|
13 |
|
} |
53
|
|
|
|
54
|
13 |
|
public function setInterval(int $interval): void |
55
|
|
|
{ |
56
|
13 |
|
$this->interval = $interval; |
57
|
13 |
|
} |
58
|
|
|
|
59
|
13 |
|
public function setQueueNames(array $queueNames): void |
60
|
|
|
{ |
61
|
13 |
|
$this->queueNames = $queueNames; |
62
|
13 |
|
} |
63
|
|
|
|
64
|
13 |
|
public function setDispatcher(Dispatcher $dispatcher): void |
65
|
|
|
{ |
66
|
13 |
|
$this->dispatcher = $dispatcher; |
67
|
13 |
|
} |
68
|
|
|
|
69
|
8 |
|
public function work(): void |
70
|
|
|
{ |
71
|
8 |
|
$this->startup(); |
72
|
|
|
|
73
|
|
|
do { |
74
|
8 |
|
if ($this->isPaused || !$this->workOneJob()) { |
75
|
1 |
|
$this->dispatcher->dispatch(WorkerIdle::class, ['worker' => $this]); |
76
|
1 |
|
sleep($this->interval); |
77
|
|
|
} |
78
|
8 |
|
} while (0 < $this->interval && !$this->shouldShutdown); |
79
|
|
|
|
80
|
8 |
|
$this->unregisterWorker(); |
81
|
8 |
|
} |
82
|
|
|
|
83
|
8 |
|
private function startup(): void |
84
|
|
|
{ |
85
|
8 |
|
$this->signalHandler->setWorker($this); |
86
|
8 |
|
$this->signalHandler->register(); |
87
|
8 |
|
$this->dispatcher->dispatch(WorkerStartup::class, ['worker' => $this]); |
88
|
8 |
|
$this->registerWorker(); |
89
|
8 |
|
} |
90
|
|
|
|
91
|
8 |
|
private function registerWorker(): void |
92
|
|
|
{ |
93
|
8 |
|
$this->dispatcher->dispatch(WorkerRegistering::class, ['worker' => $this]); |
94
|
8 |
|
$this->datastore->registerWorker($this->id); |
95
|
8 |
|
} |
96
|
|
|
|
97
|
8 |
|
private function unregisterWorker(): void |
98
|
|
|
{ |
99
|
8 |
|
$this->dispatcher->dispatch(WorkerUnregistering::class, ['worker' => $this]); |
100
|
8 |
|
$this->datastore->unregisterWorker($this->id); |
101
|
8 |
|
} |
102
|
|
|
|
103
|
8 |
|
private function workOneJob(): bool |
104
|
|
|
{ |
105
|
8 |
|
$job = $this->fetchJob(); |
106
|
8 |
|
if (empty($job)) { |
107
|
1 |
|
return false; |
108
|
|
|
} |
109
|
|
|
|
110
|
7 |
|
$this->setWorkingOn($job); |
111
|
7 |
|
$this->performWithFork($job); |
112
|
|
|
|
113
|
7 |
|
return true; |
114
|
|
|
} |
115
|
|
|
|
116
|
8 |
|
private function fetchJob(): ?Job |
117
|
|
|
{ |
118
|
8 |
|
foreach ($this->queueNames as $queueName) { |
119
|
8 |
|
$payload = $this->datastore->popFromQueue($queueName); |
120
|
8 |
|
if (!empty($payload)) { |
121
|
8 |
|
return $this->createJobFromPayload($queueName, $payload); |
122
|
|
|
} |
123
|
|
|
} |
124
|
1 |
|
return null; |
125
|
|
|
} |
126
|
|
|
|
127
|
7 |
|
private function setWorkingOn(Job $job): void |
128
|
|
|
{ |
129
|
7 |
|
$time = new \DateTime(); |
130
|
7 |
|
$time->setTimezone(new \DateTimeZone('UTC')); |
131
|
7 |
|
$timeString = $time->format(\DateTime::ISO8601); |
132
|
7 |
|
$workerPayload = $this->serializer->serialize([ |
133
|
7 |
|
'queue' => $job->getQueueName(), |
134
|
7 |
|
'run_at' => $timeString, |
135
|
7 |
|
'payload' => $job->getPayload(), |
136
|
|
|
]); |
137
|
7 |
|
$this->datastore->setWorkerPayload($this->id, $workerPayload); |
138
|
7 |
|
} |
139
|
|
|
|
140
|
7 |
|
private function performWithFork(Job $job): void |
141
|
|
|
{ |
142
|
7 |
|
$this->childId = pcntl_fork(); |
143
|
|
|
|
144
|
7 |
|
switch ($this->childId) { |
145
|
|
|
|
146
|
7 |
|
case self::FORK_FAILED: |
147
|
1 |
|
$this->dispatcher->dispatch(ForkFailed::class, ['worker' => $this, 'job' => $job]); |
148
|
1 |
|
$this->criticalWorkerShutdown($job); |
149
|
1 |
|
break; |
150
|
|
|
|
151
|
6 |
|
case self::FORK_CHILD: |
152
|
3 |
|
$this->performJob($job); |
153
|
3 |
|
$this->shouldShutdown = true; |
154
|
3 |
|
$this->isChildThread = true; |
155
|
3 |
|
break; |
156
|
|
|
|
157
|
|
|
default: // parent case |
158
|
3 |
|
$this->dispatcher->dispatch(ParentWaiting::class, ['worker' => $this]); |
159
|
3 |
|
$this->waitForChild($job); |
160
|
|
|
} |
161
|
|
|
|
162
|
7 |
|
$this->childId = null; |
163
|
7 |
|
$this->doneWorking(); |
164
|
7 |
|
} |
165
|
|
|
|
166
|
7 |
|
private function doneWorking(): void |
167
|
|
|
{ |
168
|
7 |
|
if (!$this->isChildThread) { |
169
|
4 |
|
$this->dispatcher->dispatch(WorkerDoneWorking::class, ['worker' => $this]); |
170
|
|
|
} |
171
|
7 |
|
} |
172
|
|
|
|
173
|
7 |
|
private function createJobFromPayload(string $queueName, string $serializedPayload): Job |
174
|
|
|
{ |
175
|
7 |
|
$payload = $this->serializer->unserialize($serializedPayload); |
176
|
7 |
|
$factory = $this->serviceLocator->get(Job::class); |
177
|
7 |
|
return $factory($queueName, $payload, $this->serviceLocator); |
178
|
|
|
} |
179
|
|
|
|
180
|
1 |
|
public function criticalWorkerShutdown(Job $job): void |
181
|
|
|
{ |
182
|
1 |
|
$this->requeueJob($job); |
183
|
1 |
|
$this->shutdown(); |
184
|
1 |
|
} |
185
|
|
|
|
186
|
1 |
|
private function requeueJob(Job $job): void |
187
|
|
|
{ |
188
|
1 |
|
$payload = $this->createPayloadFromJob($job); |
189
|
1 |
|
$this->datastore->pushToQueue($job->getQueueName(), $payload); |
190
|
1 |
|
} |
191
|
|
|
|
192
|
1 |
|
private function createPayloadFromJob($job): string |
193
|
|
|
{ |
194
|
1 |
|
return $this->serializer->serialize($job->getPayload()); |
195
|
|
|
} |
196
|
|
|
|
197
|
3 |
|
private function performJob(Job $job): void |
198
|
|
|
{ |
199
|
3 |
|
$this->datastore->reconnect(); |
200
|
3 |
|
$job->perform(); |
201
|
3 |
|
} |
202
|
|
|
|
203
|
3 |
|
private function waitForChild(Job $job): void |
204
|
|
|
{ |
205
|
3 |
|
$status = null; |
206
|
3 |
|
$exitCode = null; |
207
|
3 |
|
pcntl_wait($status); |
208
|
3 |
|
if (!pcntl_wifexited($status) || 0 !== ($exitCode = pcntl_wexitstatus($status))) { |
209
|
2 |
|
if ($job->hasFailed()) { |
210
|
1 |
|
$this->dispatcher->dispatch(JobFailed::class, ['worker' => $this, 'job' => $job, 'status' => $status, 'exit_code' => $exitCode]); |
211
|
|
|
} else { |
212
|
1 |
|
$this->dispatcher->dispatch(UnknownChildFailure::class, ['worker' => $this, 'job' => $job, 'status' => $status, 'exit_code' => $exitCode]); |
213
|
1 |
|
$this->shutdown(); |
214
|
|
|
} |
215
|
|
|
} |
216
|
3 |
|
} |
217
|
|
|
|
218
|
2 |
|
public function shutdown(): void |
219
|
|
|
{ |
220
|
2 |
|
$this->shouldShutdown = true; |
221
|
2 |
|
} |
222
|
|
|
|
223
|
3 |
|
public function forceShutdown(): void |
224
|
|
|
{ |
225
|
3 |
|
$this->shouldShutdown = true; |
226
|
3 |
|
if (!empty($this->childId)) { |
227
|
2 |
|
posix_kill($this->childId, SIGTERM) || posix_kill($this->childId, SIGKILL); |
228
|
|
|
} |
229
|
3 |
|
} |
230
|
|
|
|
231
|
2 |
|
public function pause(): void |
232
|
|
|
{ |
233
|
2 |
|
$this->isPaused = true; |
234
|
2 |
|
} |
235
|
|
|
|
236
|
1 |
|
public function continue() |
237
|
|
|
{ |
238
|
1 |
|
$this->isPaused = false; |
239
|
1 |
|
} |
240
|
|
|
} |
241
|
|
|
|