Passed
Push — master ( beb282...0ad4d3 )
by Dirk
02:41
created

Worker::setInterval()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 0
Metric Value
eloc 1
dl 0
loc 3
ccs 2
cts 2
cp 1
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 1
crap 1
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Resque;
6
7
use Psr\Container\ContainerInterface;
8
use Resque\Dispatchers\Noop;
9
use Resque\Interfaces\DispatcherInterface;
10
use Resque\Interfaces\SerializerInterface;
11
use Resque\Tasks\WorkerDoneWorking;
12
use Resque\Tasks\WorkerRegistering;
13
use Resque\Tasks\WorkerStartup;
14
use Resque\Tasks\WorkerUnregistering;
15
16
class Worker
17
{
18
    private const FORK_FAILED = -1;
19
    private const FORK_CHILD = 0;
20
21
    private $datastore;
22
    private $serializer;
23
    private $queueNames = ['default'];
24
    private $interval = 10;
25
    private $shouldShutdown = false;
26
    private $isPaused = false;
27
    private $isChildThread = false;
28
    private $childId = 0;
29
    private $serviceLocator;
30
    private $dispatcher = null;
31
32 9
    public function __construct(
33
        Datastore $datastore,
34
        SerializerInterface $serializer,
35
        ContainerInterface $serviceLocator,
36
        SignalHandler $signalHandler
37
    ) {
38 9
        $this->datastore = $datastore;
39 9
        $this->serializer = $serializer;
40 9
        $this->serviceLocator = $serviceLocator;
41 9
        $this->signalHandler = $signalHandler;
42 9
        $this->logger = new NoopLogger();
43 9
        $this->dispatcher = new Noop();
44
45 9
        $this->id = gethostname() . '-' . getmypid() . md5(random_bytes(2));
46 9
    }
47
48 9
    public function setInterval(int $interval): void
49
    {
50 9
        $this->interval = $interval;
51 9
    }
52
53 9
    public function setQueueNames(array $queueNames): void
54
    {
55 9
        $this->queueNames = $queueNames;
56 9
    }
57
58 9
    public function setDispatcher(DispatcherInterface $dispatcher): void
59
    {
60 9
        $this->dispatcher = $dispatcher;
61 9
    }
62
63 6
    public function work(): void
64
    {
65 6
        $this->startup();
66
67
        do {
68 6
            if ($this->isPaused || !$this->workOneJob()) {
69 1
                sleep($this->interval);
70
            }
71 6
        } while (0 < $this->interval && !$this->shouldShutdown);
72
73 6
        $this->unregisterWorker();
74 6
    }
75
76 6
    private function startup(): void
77
    {
78 6
        $this->signalHandler->setWorker($this);
79 6
        $this->signalHandler->register();
80 6
        $this->dispatcher->dispatch(WorkerStartup::class, ['worker' => $this]);
81 6
        $this->registerWorker();
82 6
    }
83
84 6
    private function registerWorker(): void
85
    {
86 6
        $this->dispatcher->dispatch(WorkerRegistering::class, ['worker' => $this]);
87 6
        $this->datastore->registerWorker($this->id);
88 6
    }
89
90 6
    private function unregisterWorker(): void
91
    {
92 6
        $this->dispatcher->dispatch(WorkerUnregistering::class, ['worker' => $this]);
93 6
        $this->datastore->unregisterWorker($this->id);
94 6
    }
95
96 6
    private function workOneJob(): bool
97
    {
98 6
        $job = $this->fetchJob();
99 6
        if (empty($job)) {
100 1
            return false;
101
        }
102
103 5
        $this->setWorkingOn($job);
104 5
        $this->performWithFork($job);
105
106 5
        return true;
107
    }
108
109 6
    private function fetchJob(): ?Job
110
    {
111 6
        foreach ($this->queueNames as $queueName) {
112 6
            $payload = $this->datastore->popFromQueue($queueName);
113 6
            if (!empty($payload)) {
114 5
                $this->logger->info('found one job');
115 5
                $this->logger->debug("payload: {$payload}");
116 6
                return $this->createJobFromPayload($queueName, $payload);
117
            }
118
        }
119 1
        return null;
120
    }
121
122 5
    private function setWorkingOn(Job $job): void
123
    {
124 5
        $time = new \DateTime();
125 5
        $time->setTimezone(new \DateTimeZone('UTC'));
126 5
        $timeString = $time->format(\DateTime::ISO8601);
127 5
        $workerPayload = $this->serializer->serialize([
128 5
            'queue' => $job->getQueueName(),
129 5
            'run_at' => $timeString,
130 5
            'payload' => $job->getPayload(),
131
        ]);
132 5
        $this->datastore->setWorkerPayload($this->id, $workerPayload);
133 5
    }
134
135 5
    private function performWithFork(Job $job): void
136
    {
137 5
        $this->childId = pcntl_fork();
138
139 5
        switch ($this->childId) {
140
141 5
            case self::FORK_FAILED:
142 1
                $this->criticalWorkerShutdown($job);
143 1
                break;
144
145 4
            case self::FORK_CHILD:
146 2
                $this->performJob($job);
147 2
                $this->shouldShutdown = true;
148 2
                $this->isChildThread = true;
149 2
                break;
150
151
            default: // parent case
152 2
                $this->waitForChild($job);
153
        }
154
155 5
        $this->childId = null;
156 5
        $this->doneWorking();
157 5
    }
158
159 5
    private function doneWorking(): void
160
    {
161 5
        if (!$this->isChildThread) {
162 3
            $this->dispatcher->dispatch(WorkerDoneWorking::class, ['worker' => $this]);
163
        }
164 5
    }
165
166 5
    private function createJobFromPayload(string $queueName, string $serializedPayload): Job
167
    {
168 5
        $payload = $this->serializer->unserialize($serializedPayload);
169 5
        $factory = $this->serviceLocator->get(Job::class);
170 5
        return $factory($queueName, $payload);
171
    }
172
173 1
    public function criticalWorkerShutdown(Job $job): void
174
    {
175 1
        $this->requeueJob($job);
176 1
        $this->shutdown();
177 1
    }
178
179 1
    private function requeueJob(Job $job): void
180
    {
181 1
        $payload = $this->createPayloadFromJob($job);
182 1
        $this->datastore->pushToQueue($job->getQueueName(), $payload);
183 1
    }
184
185 1
    private function createPayloadFromJob($job): string
186
    {
187 1
        return $this->serializer->serialize($job->getPayload());
188
    }
189
190 2
    private function performJob(Job $job): void
191
    {
192 2
        $this->datastore->reconnect();
193 2
        $job->perform();
194 2
    }
195
196 2
    private function waitForChild(Job $job): void
197
    {
198 2
        $status = null;
199 2
        pcntl_wait($status);
200 2
        if (!pcntl_wifexited($status) || ($exitStatus = pcntl_wexitstatus($status)) !== 0) {
201 2
            if ($job->hasFailed()) {
202
                // user land job has failed. nothing to be done about that ... except maybe queue in failed queue
203
            } else {
204
                // unexpected failure - handle dirty exit
205
            }
206
        }
207 2
    }
208
209 1
    public function shutdown(): void
210
    {
211 1
        $this->shouldShutdown = true;
212 1
    }
213
214 3
    public function forceShutdown(): void
215
    {
216 3
        $this->shouldShutdown = true;
217 3
        if (!empty($this->childId)) {
218 2
            posix_kill($this->childId, SIGTERM) || posix_kill($this->childId, SIGKILL);
219
        }
220 3
    }
221
222
    public function pause(): void
223
    {
224
        $this->isPaused = true;
225
    }
226
227
    public function continue()
228
    {
229
        $this->isPaused = false;
230
    }
231
}
232