Worker::criticalWorkerShutdown()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 0
Metric Value
eloc 2
dl 0
loc 4
ccs 3
cts 3
cp 1
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 1
crap 1
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Resque;
6
7
use Psr\Container\ContainerInterface;
8
use Resque\Dispatchers\Noop;
9
use Resque\Interfaces\Dispatcher;
10
use Resque\Interfaces\Serializer;
11
use Resque\Tasks\ForkFailed;
12
use Resque\Tasks\JobFailed;
13
use Resque\Tasks\ParentWaiting;
14
use Resque\Tasks\UnknownChildFailure;
15
use Resque\Tasks\WorkerDoneWorking;
16
use Resque\Tasks\WorkerIdle;
17
use Resque\Tasks\WorkerRegistering;
18
use Resque\Tasks\WorkerStartup;
19
use Resque\Tasks\WorkerUnregistering;
20
21
class Worker
22
{
23
    private const FORK_FAILED = -1;
24
    private const FORK_CHILD = 0;
25
26
    private $datastore;
27
    private $serializer;
28
    private $queueNames = ['default'];
29
    private $interval = 10;
30
    private $shouldShutdown = false;
31
    private $isPaused = false;
32
    private $isChildThread = false;
33
    private $childId = 0;
34
    private $serviceLocator;
35
    private $dispatcher = null;
36
    private $id;
37
    private $signalHandler;
38
39 13
    public function __construct(
40
        Datastore $datastore,
41
        Serializer $serializer,
42
        ContainerInterface $serviceLocator,
43
        SignalHandler $signalHandler
44
    ) {
45 13
        $this->datastore = $datastore;
46 13
        $this->serializer = $serializer;
47 13
        $this->serviceLocator = $serviceLocator;
48 13
        $this->signalHandler = $signalHandler;
49 13
        $this->dispatcher = new Noop();
50
51 13
        $this->id = gethostname() . '-' . getmypid() . md5(random_bytes(2));
52 13
    }
53
54 13
    public function setInterval(int $interval): void
55
    {
56 13
        $this->interval = $interval;
57 13
    }
58
59 13
    public function setQueueNames(array $queueNames): void
60
    {
61 13
        $this->queueNames = $queueNames;
62 13
    }
63
64 13
    public function setDispatcher(Dispatcher $dispatcher): void
65
    {
66 13
        $this->dispatcher = $dispatcher;
67 13
    }
68
69 8
    public function work(): void
70
    {
71 8
        $this->startup();
72
73
        do {
74 8
            if ($this->isPaused || !$this->workOneJob()) {
75 1
                $this->dispatcher->dispatch(WorkerIdle::class, ['worker' => $this]);
76 1
                sleep($this->interval);
77
            }
78 8
        } while (0 < $this->interval && !$this->shouldShutdown);
79
80 8
        $this->unregisterWorker();
81 8
    }
82
83 8
    private function startup(): void
84
    {
85 8
        $this->signalHandler->setWorker($this);
86 8
        $this->signalHandler->register();
87 8
        $this->dispatcher->dispatch(WorkerStartup::class, ['worker' => $this]);
88 8
        $this->registerWorker();
89 8
    }
90
91 8
    private function registerWorker(): void
92
    {
93 8
        $this->dispatcher->dispatch(WorkerRegistering::class, ['worker' => $this]);
94 8
        $this->datastore->registerWorker($this->id);
95 8
    }
96
97 8
    private function unregisterWorker(): void
98
    {
99 8
        $this->dispatcher->dispatch(WorkerUnregistering::class, ['worker' => $this]);
100 8
        $this->datastore->unregisterWorker($this->id);
101 8
    }
102
103 8
    private function workOneJob(): bool
104
    {
105 8
        $job = $this->fetchJob();
106 8
        if (empty($job)) {
107 1
            return false;
108
        }
109
110 7
        $this->setWorkingOn($job);
111 7
        $this->performWithFork($job);
112
113 7
        return true;
114
    }
115
116 8
    private function fetchJob(): ?Job
117
    {
118 8
        foreach ($this->queueNames as $queueName) {
119 8
            $payload = $this->datastore->popFromQueue($queueName);
120 8
            if (!empty($payload)) {
121 8
                return $this->createJobFromPayload($queueName, $payload);
122
            }
123
        }
124 1
        return null;
125
    }
126
127 7
    private function setWorkingOn(Job $job): void
128
    {
129 7
        $time = new \DateTime();
130 7
        $time->setTimezone(new \DateTimeZone('UTC'));
131 7
        $timeString = $time->format(\DateTime::ISO8601);
132 7
        $workerPayload = $this->serializer->serialize([
133 7
            'queue' => $job->getQueueName(),
134 7
            'run_at' => $timeString,
135 7
            'payload' => $job->getPayload(),
136
        ]);
137 7
        $this->datastore->setWorkerPayload($this->id, $workerPayload);
138 7
    }
139
140 7
    private function performWithFork(Job $job): void
141
    {
142 7
        $this->childId = pcntl_fork();
143
144 7
        switch ($this->childId) {
145
146 7
            case self::FORK_FAILED:
147 1
                $this->dispatcher->dispatch(ForkFailed::class, ['worker' => $this, 'job' => $job]);
148 1
                $this->criticalWorkerShutdown($job);
149 1
                break;
150
151 6
            case self::FORK_CHILD:
152 3
                $this->performJob($job);
153 3
                $this->shouldShutdown = true;
154 3
                $this->isChildThread = true;
155 3
                break;
156
157
            default: // parent case
158 3
                $this->dispatcher->dispatch(ParentWaiting::class, ['worker' => $this]);
159 3
                $this->waitForChild($job);
160
        }
161
162 7
        $this->childId = null;
163 7
        $this->doneWorking();
164 7
    }
165
166 7
    private function doneWorking(): void
167
    {
168 7
        if (!$this->isChildThread) {
169 4
            $this->dispatcher->dispatch(WorkerDoneWorking::class, ['worker' => $this]);
170
        }
171 7
    }
172
173 7
    private function createJobFromPayload(string $queueName, string $serializedPayload): Job
174
    {
175 7
        $payload = $this->serializer->unserialize($serializedPayload);
176 7
        $factory = $this->serviceLocator->get(Job::class);
177 7
        return $factory($queueName, $payload, $this->serviceLocator);
178
    }
179
180 1
    public function criticalWorkerShutdown(Job $job): void
181
    {
182 1
        $this->requeueJob($job);
183 1
        $this->shutdown();
184 1
    }
185
186 1
    private function requeueJob(Job $job): void
187
    {
188 1
        $payload = $this->createPayloadFromJob($job);
189 1
        $this->datastore->pushToQueue($job->getQueueName(), $payload);
190 1
    }
191
192 1
    private function createPayloadFromJob($job): string
193
    {
194 1
        return $this->serializer->serialize($job->getPayload());
195
    }
196
197 3
    private function performJob(Job $job): void
198
    {
199 3
        $this->datastore->reconnect();
200 3
        $job->perform();
201 3
    }
202
203 3
    private function waitForChild(Job $job): void
204
    {
205 3
        $status = null;
206 3
        $exitCode = null;
207 3
        pcntl_wait($status);
208 3
        if (!pcntl_wifexited($status) || 0 !== ($exitCode = pcntl_wexitstatus($status))) {
209 2
            if ($job->hasFailed()) {
210 1
                $this->dispatcher->dispatch(JobFailed::class, ['worker' => $this, 'job' => $job, 'status' => $status, 'exit_code' => $exitCode]);
211
            } else {
212 1
                $this->dispatcher->dispatch(UnknownChildFailure::class, ['worker' => $this, 'job' => $job, 'status' => $status, 'exit_code' => $exitCode]);
213 1
                $this->shutdown();
214
            }
215
        }
216 3
    }
217
218 2
    public function shutdown(): void
219
    {
220 2
        $this->shouldShutdown = true;
221 2
    }
222
223 3
    public function forceShutdown(): void
224
    {
225 3
        $this->shouldShutdown = true;
226 3
        if (!empty($this->childId)) {
227 2
            posix_kill($this->childId, SIGTERM) || posix_kill($this->childId, SIGKILL);
228
        }
229 3
    }
230
231 2
    public function pause(): void
232
    {
233 2
        $this->isPaused = true;
234 2
    }
235
236 1
    public function continue()
237
    {
238 1
        $this->isPaused = false;
239 1
    }
240
}
241