Passed
Push — master ( d6f608...99e5d3 )
by Dirk
02:52
created

Worker::shutdown()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 0
Metric Value
eloc 1
dl 0
loc 3
ccs 2
cts 2
cp 1
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 0
crap 1
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Resque;
6
7
use Psr\Container\ContainerInterface;
8
use Psr\Log\LoggerInterface;
9
use Resque\Dispatchers\Noop;
10
use Resque\Interfaces\DispatcherInterface;
11
use Resque\Interfaces\SerializerInterface;
12
use Resque\Tasks\WorkerDoneWorking;
13
use Resque\Tasks\WorkerRegistering;
14
use Resque\Tasks\WorkerStartup;
15
use Resque\Tasks\WorkerUnregistering;
16
17
class Worker
18
{
19
    private const FORK_FAILED = -1;
20
    private const FORK_CHILD = 0;
21
22
    private $datastore;
23
    private $serializer;
24
    private $queueNames = ['default'];
25
    private $interval = 10;
26
    private $shouldShutdown = false;
27
    private $isPaused = false;
28
    private $isChildThread = false;
29
    private $childId = 0;
30
    private $serviceLocator;
31
    private $dispatcher = null;
32
    private $logger = null;
33
34 6
    public function __construct(
35
        Datastore $datastore,
36
        SerializerInterface $serializer,
37
        ContainerInterface $serviceLocator,
38
        SignalHandler $signalHandler
39
    ) {
40 6
        $this->datastore = $datastore;
41 6
        $this->serializer = $serializer;
42 6
        $this->serviceLocator = $serviceLocator;
43 6
        $this->signalHandler = $signalHandler;
44 6
        $this->logger = new NoopLogger();
45 6
        $this->dispatcher = new Noop();
46
47 6
        $this->id = gethostname() . '-' . getmypid() . md5(random_bytes(2));
48 6
    }
49
50 6
    public function setInterval(int $interval): void
51
    {
52 6
        $this->interval = $interval;
53 6
    }
54
55 6
    public function setQueueNames(array $queueNames): void
56
    {
57 6
        $this->queueNames = $queueNames;
58 6
    }
59
60 6
    public function setDispatcher(DispatcherInterface $dispatcher): void
61
    {
62 6
        $this->dispatcher = $dispatcher;
63 6
    }
64
65
    public function setLogger(LoggerInterface $logger): void
66
    {
67
        $this->logger = $logger;
68
    }
69
70 6
    public function work(): void
71
    {
72 6
        $this->startup();
73
74
        do {
75 6
            if ($this->isPaused || !$this->workOneJob()) {
76 1
                sleep($this->interval);
77
            }
78 6
        } while (0 < $this->interval && !$this->shouldShutdown);
79
80 6
        $this->unregisterWorker();
81 6
    }
82
83 6
    private function startup(): void
84
    {
85 6
        $this->signalHandler->setWorker($this);
86 6
        $this->signalHandler->register();
87 6
        $this->dispatcher->dispatch(WorkerStartup::class, ['worker' => $this]);
88 6
        $this->registerWorker();
89 6
    }
90
91 6
    private function registerWorker(): void
92
    {
93 6
        $this->dispatcher->dispatch(WorkerRegistering::class, ['worker' => $this]);
94 6
        $this->datastore->registerWorker($this->id);
95 6
    }
96
97 6
    private function unregisterWorker(): void
98
    {
99 6
        $this->dispatcher->dispatch(WorkerUnregistering::class, ['worker' => $this]);
100 6
        $this->datastore->unregisterWorker($this->id);
101 6
    }
102
103 6
    private function workOneJob(): bool
104
    {
105 6
        $job = $this->fetchJob();
106 6
        if (empty($job)) {
107 1
            return false;
108
        }
109
110 5
        $this->setWorkingOn($job);
111 5
        $this->performWithFork($job);
112
113 5
        return true;
114
    }
115
116 6
    private function fetchJob(): ?Job
117
    {
118 6
        foreach ($this->queueNames as $queueName) {
119 6
            $payload = $this->datastore->popFromQueue($queueName);
120 6
            if (!empty($payload)) {
121 5
                $this->logger->info('found one job');
122 5
                $this->logger->debug("payload: {$payload}");
123 6
                return $this->createJobFromPayload($queueName, $payload);
124
            }
125
        }
126 1
        return null;
127
    }
128
129 5
    private function setWorkingOn(Job $job): void
130
    {
131 5
        $time = new \DateTime();
132 5
        $time->setTimezone(new \DateTimeZone('UTC'));
133 5
        $timeString = $time->format(\DateTime::ISO8601);
134 5
        $workerPayload = $this->serializer->serialize([
135 5
            'queue' => $job->getQueueName(),
136 5
            'run_at' => $timeString,
137 5
            'payload' => $job->getPayload(),
138
        ]);
139 5
        $this->datastore->setWorkerPayload($this->id, $workerPayload);
140 5
    }
141
142 5
    private function performWithFork(Job $job): void
143
    {
144 5
        $this->childId = pcntl_fork();
145
146 5
        switch ($this->childId) {
147
148 5
            case self::FORK_FAILED:
149 1
                $this->criticalWorkerShutdown($job);
150 1
                break;
151
152 4
            case self::FORK_CHILD:
153 2
                $this->performJob($job);
154 2
                $this->shouldShutdown = true;
155 2
                $this->isChildThread = true;
156 2
                break;
157
158
            default: // parent case
159 2
                $this->waitForChild($job);
160
        }
161
162 5
        $this->childId = null;
163 5
        $this->doneWorking();
164 5
    }
165
166 5
    private function doneWorking(): void
167
    {
168 5
        if (!$this->isChildThread) {
169 3
            $this->dispatcher->dispatch(WorkerDoneWorking::class, ['worker' => $this]);
170
        }
171 5
    }
172
173 5
    private function createJobFromPayload(string $queueName, string $serializedPayload): Job
174
    {
175 5
        $payload = $this->serializer->unserialize($serializedPayload);
176 5
        $factory = $this->serviceLocator->get(Job::class);
177 5
        return $factory($queueName, $payload);
178
    }
179
180 1
    public function criticalWorkerShutdown(Job $job): void
181
    {
182 1
        $this->requeueJob($job);
183 1
        $this->shutdown();
184 1
    }
185
186 1
    private function requeueJob(Job $job): void
187
    {
188 1
        $payload = $this->createPayloadFromJob($job);
189 1
        $this->datastore->pushToQueue($job->getQueueName(), $payload);
190 1
    }
191
192 1
    private function createPayloadFromJob($job): string
193
    {
194 1
        return $this->serializer->serialize($job->getPayload());
195
    }
196
197 2
    private function performJob(Job $job): void
198
    {
199 2
        $this->datastore->reconnect();
200 2
        $job->perform();
201 2
    }
202
203 2
    private function waitForChild(Job $job): void
204
    {
205 2
        $status = null;
206 2
        pcntl_wait($status);
207 2
        if (!pcntl_wifexited($status) || ($exitStatus = pcntl_wexitstatus($status)) !== 0) {
208 2
            if ($job->hasFailed()) {
209
                // user land job has failed. nothing to be done about that ... except maybe queue in failed queue
210
            } else {
211
                // unexpected failure - handle dirty exit
212
            }
213
        }
214 2
    }
215
216 1
    public function shutdown(): void
217
    {
218 1
        $this->shouldShutdown = true;
219 1
    }
220
221
    public function forceShutdown(): void
222
    {
223
        $this->shouldShutdown = true;
224
        if (!empty($this->childId)) {
225
            posix_kill($this->childId, SIGTERM) || posix_kill($this->childId, SIGKILL);
226
        }
227
    }
228
229
    public function pause(): void
230
    {
231
        $this->isPaused = true;
232
    }
233
234
    public function continue()
235
    {
236
        $this->isPaused = false;
237
    }
238
}
239