Passed
Push — master ( 39b8bd...47b47d )
by Dirk
02:37
created

Worker::unregisterWorker()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
eloc 2
dl 0
loc 4
ccs 0
cts 3
cp 0
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 0
crap 2
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Resque;
6
7
use Psr\Container\ContainerInterface;
8
use Psr\Log\LoggerInterface;
9
use Resque\Dispatchers\Noop;
10
use Resque\Interfaces\DispatcherInterface;
11
use Resque\Interfaces\SerializerInterface;
12
use Resque\Tasks\WorkerDoneWorking;
13
use Resque\Tasks\WorkerRegistering;
14
use Resque\Tasks\WorkerStartup;
15
use Resque\Tasks\WorkerUnregistering;
16
17
class Worker
18
{
19
    private const FORK_FAILED = -1;
20
    private const FORK_CHILD = 0;
21
22
    private $datastore;
23
    private $serializer;
24
    private $queueNames = ['default'];
25
    private $interval = 10;
26
    private $shouldShutdown = false;
27
    private $isPaused = false;
28
    private $isChildThread = false;
29
    private $childId = 0;
30
    private $serviceLocator;
31
    private $dispatcher = null;
32
    private $logger = null;
33
34 4
    public function __construct(
35
        Datastore $datastore,
36
        SerializerInterface $serializer,
37
        ContainerInterface $serviceLocator,
38
        SignalHandler $signalHandler
39
    ) {
40 4
        $this->datastore = $datastore;
41 4
        $this->serializer = $serializer;
42 4
        $this->serviceLocator = $serviceLocator;
43 4
        $this->signalHandler = $signalHandler;
44 4
        $this->logger = new NoopLogger();
45 4
        $this->dispatcher = new Noop();
46
47 4
        $this->id = gethostname() . '-' . getmypid() . md5(random_bytes(2));
48 4
    }
49
50 4
    public function setInterval(int $interval): void
51
    {
52 4
        $this->interval = $interval;
53 4
    }
54
55 4
    public function setQueueNames(array $queueNames): void
56
    {
57 4
        $this->queueNames = $queueNames;
58 4
    }
59
60 4
    public function setDispatcher(DispatcherInterface $dispatcher): void
61
    {
62 4
        $this->dispatcher = $dispatcher;
63 4
    }
64
65
    public function setLogger(LoggerInterface $logger): void
66
    {
67
        $this->logger = $logger;
68
    }
69
70 4
    public function work(): void
71
    {
72 4
        $this->startup();
73
74
        do {
75 4
            if ($this->isPaused || !$this->workOneJob()) {
76 1
                sleep($this->interval);
77
            }
78 4
        } while (0 < $this->interval && !$this->shouldShutdown);
79 4
    }
80
81 4
    private function startup(): void
82
    {
83 4
        $this->signalHandler->setWorker($this);
84 4
        $this->signalHandler->register();
85 4
        $this->dispatcher->dispatch(WorkerStartup::class, ['worker' => $this]);
86 4
        $this->registerWorker();
87 4
    }
88
89 4
    private function registerWorker(): void
90
    {
91 4
        $this->dispatcher->dispatch(WorkerRegistering::class, ['worker' => $this]);
92 4
        $this->datastore->registerWorker($this->id);
93 4
    }
94
95
    private function unregisterWorker(): void
96
    {
97
        $this->dispatcher->dispatch(WorkerUnregistering::class, ['worker' => $this]);
98
        $this->datastore->unregisterWorker($this->id);
99
    }
100
101 4
    private function workOneJob(): bool
102
    {
103 4
        $job = $this->fetchJob();
104 4
        if (empty($job)) {
105 1
            return false;
106
        }
107
108 3
        $this->setWorkingOn($job);
109 3
        $this->performWithFork($job);
110
111 3
        return true;
112
    }
113
114 4
    private function fetchJob(): ?Job
115
    {
116 4
        foreach ($this->queueNames as $queueName) {
117 4
            $payload = $this->datastore->popFromQueue($queueName);
118 4
            if (!empty($payload)) {
119 3
                $this->logger->info('found one job');
120 3
                $this->logger->debug("payload: {$payload}");
121 4
                return $this->createJobFromPayload($queueName, $payload);
122
            }
123
        }
124 1
        return null;
125
    }
126
127 3
    private function setWorkingOn(Job $job): void
128
    {
129 3
        $time = new \DateTime();
130 3
        $time->setTimezone(new \DateTimeZone('UTC'));
131 3
        $timeString = $time->format(\DateTime::ISO8601);
132 3
        $workerPayload = $this->serializer->serialize([
133 3
            'queue' => $job->getQueueName(),
134 3
            'run_at' => $timeString,
135 3
            'payload' => $job->getPayload(),
136
        ]);
137 3
        $this->datastore->setWorkerPayload($this->id, $workerPayload);
138 3
    }
139
140 3
    private function performWithFork(Job $job): void
141
    {
142 3
        $this->childId = pcntl_fork();
143
144 3
        switch ($this->childId) {
145
146 3
            case self::FORK_FAILED:
147 1
                $this->criticalWorkerShutdown($job);
148 1
                break;
149
150 2
            case self::FORK_CHILD:
151 2
                $this->performJob($job);
152 2
                $this->shouldShutdown = true;
153 2
                $this->isChildThread = true;
154 2
                break;
155
156
            default: // parent case
157
                $this->waitForChild($job);
158
        }
159
160 3
        $this->childId = null;
161 3
        $this->doneWorking();
162 3
    }
163
164 3
    private function doneWorking(): void
165
    {
166 3
        if (!$this->isChildThread) {
167 1
            $this->dispatcher->dispatch(WorkerDoneWorking::class, ['worker' => $this]);
168
        }
169 3
    }
170
171 3
    private function createJobFromPayload(string $queueName, string $serializedPayload): Job
172
    {
173 3
        $payload = $this->serializer->unserialize($serializedPayload);
174 3
        $factory = $this->serviceLocator->get(Job::class);
175 3
        return $factory($queueName, $payload);
176
    }
177
178 1
    public function criticalWorkerShutdown(Job $job): void
179
    {
180 1
        $this->requeueJob($job);
181 1
        $this->shutdown();
182 1
    }
183
184 1
    private function requeueJob(Job $job): void
185
    {
186 1
        $payload = $this->createPayloadFromJob($job);
187 1
        $this->datastore->pushToQueue($job->getQueueName(), $payload);
188 1
    }
189
190 1
    private function createPayloadFromJob($job): string
191
    {
192 1
        return $this->serializer->serialize($job->getPayload());
193
    }
194
195 2
    private function performJob(Job $job): void
196
    {
197 2
        $this->datastore->reconnect();
198 2
        $job->perform();
199 2
    }
200
201
    private function waitForChild(Job $job): void
202
    {
203
        $status = null;
204
        pcntl_wait($status);
205
        if (!pcntl_wifexited($status) || ($exitStatus = pcntl_wexitstatus($status)) !== 0) {
206
            if ($job->hasFailed()) {
207
                // user land job has failed. nothing to be done about that ... except maybe queue in failed queue
208
            } else {
209
                // unexpected failure - handle dirty exit
210
            }
211
        }
212
    }
213
214 1
    public function shutdown(): void
215
    {
216 1
        $this->shouldShutdown = true;
217 1
    }
218
219
    public function forceShutdown(): void
220
    {
221
        $this->shouldShutdown = true;
222
        if (!empty($this->childId)) {
223
            posix_kill($this->childId, SIGTERM) || posix_kill($this->childId, SIGKILL);
224
        }
225
    }
226
227
    public function pause(): void
228
    {
229
        $this->isPaused = true;
230
    }
231
232
    public function continue()
233
    {
234
        $this->isPaused = false;
235
    }
236
}
237