Completed
Branch dev (05d0ad)
by Raffael
02:25
created

Queue::exitWorkerManager()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 8
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 1

Importance

Changes 0
Metric Value
eloc 4
dl 0
loc 8
ccs 5
cts 5
cp 1
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 2
crap 1
1
<?php
2
3
declare(strict_types=1);
4
5
/**
6
 * TaskScheduler
7
 *
8
 * @author      Raffael Sahli <[email protected]>
9
 * @copyright   Copryright (c) 2017-2018 gyselroth GmbH (https://gyselroth.com)
10
 * @license     MIT https://opensource.org/licenses/MIT
11
 */
12
13
namespace TaskScheduler;
14
15
use MongoDB\BSON\UTCDateTime;
16
use MongoDB\Database;
17
use Psr\Log\LoggerInterface;
18
use TaskScheduler\Exception\SpawnForkException;
19
20
class Queue
21
{
22
    use InjectTrait;
23
24
    /**
25
     * Database.
26
     *
27
     * @var Database
28
     */
29
    protected $db;
30
31
    /**
32
     * Logger.
33
     *
34
     * @var LoggerInterface
35
     */
36
    protected $logger;
37
38
    /**
39
     * Worker factory.
40
     *
41
     * @var WorkerFactoryInterface
42
     */
43
    protected $factory;
44
45
    /**
46
     * Jobs queue.
47
     *
48
     * @var MessageQueue
49
     */
50
    protected $jobs;
51
52
    /**
53
     * Events queue.
54
     *
55
     * @var MessageQueue
56
     */
57
    protected $events;
58
59
    /**
60
     * Worker manager pid.
61
     *
62
     * @var int
63
     */
64
    protected $manager_pid;
65
66
    /**
67
     * Init queue.
68
     */
69 1
    public function __construct(Scheduler $scheduler, Database $db, WorkerFactoryInterface $factory, LoggerInterface $logger)
70
    {
71 1
        $this->db = $db;
72 1
        $this->logger = $logger;
73 1
        $this->factory = $factory;
74 1
        $this->jobs = new MessageQueue($db, $scheduler->getJobQueue(), $scheduler->getJobQueueSize(), $logger);
75 1
        $this->events = new MessageQueue($db, $scheduler->getEventQueue(), $scheduler->getEventQueueSize(), $logger);
76 1
    }
77
78
    /**
79
     * Startup (blocking process).
80
     */
81 1
    public function process(): void
82
    {
83
        try {
84 1
            $this->queue = msg_get_queue(ftok(__FILE__, 't'));
0 ignored issues
show
Bug Best Practice introduced by
The property queue does not exist. Although not strictly required by PHP, it is generally a best practice to declare properties explicitly.
Loading history...
85 1
            $this->catchSignal();
86 1
            $this->initWorkerManager();
87 1
            $this->main();
88
        } catch (\Exception $e) {
89
            $this->logger->error('main() throw an exception, cleanup and exit', [
90
                'class' => get_class($this),
91
                'exception' => $e,
92
            ]);
93
94
            $this->cleanup(SIGTERM);
95
        }
96 1
    }
97
98
    /**
99
     * Wait for worker manager.
100
     */
101 1
    public function exitWorkerManager(int $sig, array $pid): void
102
    {
103 1
        $this->logger->debug('fork manager ['.$pid['pid'].'] exit with ['.$sig.']', [
104 1
            'category' => get_class($this),
105
        ]);
106
107 1
        pcntl_waitpid($pid['pid'], $status, WNOHANG | WUNTRACED);
108 1
        $this->cleanup(SIGTERM);
109 1
    }
110
111
    /**
112
     * Cleanup.
113
     */
114 1
    public function cleanup(int $sig): void
115
    {
116 1
        if (null !== $this->manager_pid) {
117 1
            $this->logger->debug('received exit signal ['.$sig.'], forward signal to the fork manager ['.$sig.']', [
118 1
                'category' => get_class($this),
119
            ]);
120
121 1
            posix_kill($this->manager_pid, $sig);
122
        }
123
124 1
        $this->exit();
125 1
    }
126
127
    /**
128
     * Fork a worker manager.
129
     */
130 1
    protected function initWorkerManager()
131
    {
132 1
        $pid = pcntl_fork();
133 1
        $this->manager_pid = $pid;
134
135 1
        if (-1 === $pid) {
136
            throw new SpawnForkException('failed to spawn fork manager');
137
        }
138
139 1
        if (!$pid) {
140
            $manager = $this->factory->buildManager();
141
            $manager->process();
142
            exit();
0 ignored issues
show
Best Practice introduced by
Using exit here is not recommended.

In general, usage of exit should be done with care and only when running in a scripting context like a CLI script.

Loading history...
143
        }
144 1
    }
145
146
    /**
147
     * Fork handling, blocking process.
148
     */
149 1
    protected function main(): void
150
    {
151 1
        $this->logger->info('start job listener', [
152 1
            'category' => get_class($this),
153
        ]);
154
155 1
        $cursor_jobs = $this->jobs->getCursor([
156 1
            '$or' => [
157
                ['status' => JobInterface::STATUS_WAITING],
158
                ['status' => JobInterface::STATUS_POSTPONED],
159
            ],
160
        ]);
161
162 1
        $cursor_events = $this->events->getCursor([
163 1
            'timestamp' => ['$gte' => new UTCDateTime()],
164
            'job' => ['$exists' => true],
165
            'status' => ['$gt' => JobInterface::STATUS_POSTPONED],
166
        ]);
167
168 1
        $this->catchSignal();
169
170 1
        while ($this->loop()) {
171 1
            $event = $cursor_events->current();
172 1
            $this->events->next($cursor_events, function () {
173
                $this->main();
174 1
            });
175 1
            $cursor_events->next();
176
177 1
            if (null === $event) {
178 1
                if ($cursor_events->getInnerIterator()->isDead()) {
0 ignored issues
show
Bug introduced by
The method isDead() does not exist on Iterator. It seems like you code against a sub-type of Iterator such as Helmich\MongoMock\MockCursor or Nette\Iterators\CachingIterator. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

178
                if ($cursor_events->getInnerIterator()->/** @scrutinizer ignore-call */ isDead()) {
Loading history...
179
                    $this->logger->error('event queue cursor is dead, is it a capped collection?', [
180
                        'category' => get_class($this),
181
                    ]);
182
183
                    $this->events->create();
184
185
                    $this->main();
186
187 1
                    break;
188
                }
189
            } else {
190
                $this->handleEvent($event);
191
            }
192
193 1
            if (null === $cursor_jobs->current()) {
194 1
                if ($cursor_jobs->getInnerIterator()->isDead()) {
195
                    $this->logger->error('job queue cursor is dead, is it a capped collection?', [
196
                        'category' => get_class($this),
197
                    ]);
198
199
                    $this->jobs->create();
200
                    $this->main();
201
202
                    break;
203
                }
204
205 1
                $this->jobs->next($cursor_jobs, function () {
206
                    $this->main();
207 1
                });
208
209 1
                continue;
210
            }
211
212
            $job = $cursor_jobs->current();
213
214
            $this->jobs->next($cursor_jobs, function () {
215
                $this->main();
216
            });
217
218
            $this->handleJob($job);
219
        }
220 1
    }
221
222
    /**
223
     * Handle events.
224
     */
225
    protected function handleEvent(array $event): self
226
    {
227
        $this->logger->debug('received event ['.$event['status'].'] for job ['.$event['job'].'], write into systemv queue', [
228
            'category' => get_class($this),
229
        ]);
230
231
        msg_send($this->queue, WorkerManager::TYPE_EVENT, $event);
232
233
        return $this;
234
    }
235
236
    /**
237
     * Handle job.
238
     */
239
    protected function handleJob(array $job): self
240
    {
241
        $this->logger->debug('received job ['.$job['_id'].'], write in systemv message queue', [
242
            'category' => get_class($this),
243
        ]);
244
245
        msg_send($this->queue, WorkerManager::TYPE_JOB, [
246
            '_id' => $job['_id'],
247
            'options' => $job['options'],
248
        ]);
249
250
        return $this;
251
    }
252
253
    /**
254
     * Catch signals and cleanup.
255
     */
256 1
    protected function catchSignal(): self
257
    {
258 1
        pcntl_async_signals(true);
259 1
        pcntl_signal(SIGTERM, [$this, 'cleanup']);
260 1
        pcntl_signal(SIGINT, [$this, 'cleanup']);
261 1
        pcntl_signal(SIGCHLD, [$this, 'exitWorkerManager']);
262
263 1
        return $this;
264
    }
265
}
266