Passed
Push — master ( 90fb7b...2d8012 )
by Raffael
03:43
created

Queue::exitWorkerManager()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 8
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
eloc 4
dl 0
loc 8
ccs 0
cts 5
cp 0
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 2
crap 2
1
<?php
2
3
declare(strict_types=1);
4
5
/**
6
 * TaskScheduler
7
 *
8
 * @author      Raffael Sahli <[email protected]>
9
 * @copyright   Copryright (c) 2017-2018 gyselroth GmbH (https://gyselroth.com)
10
 * @license     MIT https://opensource.org/licenses/MIT
11
 */
12
13
namespace TaskScheduler;
14
15
use MongoDB\BSON\UTCDateTime;
16
use MongoDB\Database;
17
use Psr\Log\LoggerInterface;
18
use TaskScheduler\Exception\SpawnForkException;
19
20
class Queue
21
{
22
    use InjectTrait;
23
24
    /**
25
     * Database.
26
     *
27
     * @var Database
28
     */
29
    protected $db;
30
31
    /**
32
     * Logger.
33
     *
34
     * @var LoggerInterface
35
     */
36
    protected $logger;
37
38
    /**
39
     * Worker factory.
40
     *
41
     * @var WorkerFactoryInterface
42
     */
43
    protected $factory;
44
45
    /**
46
     * Jobs queue.
47
     *
48
     * @var MessageQueue
49
     */
50
    protected $jobs;
51
52
    /**
53
     * Events queue.
54
     *
55
     * @var MessageQueue
56
     */
57
    protected $events;
58
59
    /**
60
     * Worker manager pid.
61
     *
62
     * @var int
63
     */
64
    protected $manager_pid;
65
66
    /**
67
     * Sysmfsg queue
68
     *
69
     * @var resource
70
     */
71
    protected $queue;
72
73
    /**
74
     * Init queue.
75
     */
76 1
    public function __construct(Scheduler $scheduler, Database $db, WorkerFactoryInterface $factory, LoggerInterface $logger)
77
    {
78 1
        $this->db = $db;
79 1
        $this->logger = $logger;
80 1
        $this->factory = $factory;
81 1
        $this->jobs = new MessageQueue($db, $scheduler->getJobQueue(), $scheduler->getJobQueueSize(), $logger);
82 1
        $this->events = new MessageQueue($db, $scheduler->getEventQueue(), $scheduler->getEventQueueSize(), $logger);
83 1
    }
84
85
    /**
86
     * Startup (blocking process).
87
     */
88 1
    public function process(): void
89
    {
90
        try {
91 1
            $this->queue = msg_get_queue(ftok(__FILE__, 't'));
92 1
            $this->catchSignal();
93 1
            $this->initWorkerManager();
94 1
            $this->main();
95
        } catch (\Exception $e) {
96
            $this->logger->error('main() throw an exception, cleanup and exit', [
97
                'class' => get_class($this),
98
                'exception' => $e,
99
            ]);
100
101
            $this->cleanup(SIGTERM);
102
        }
103 1
    }
104
105
    /**
106
     * Wait for worker manager.
107
     */
108
    public function exitWorkerManager(int $sig, array $pid): void
109
    {
110
        $this->logger->debug('fork manager ['.$pid['pid'].'] exit with ['.$sig.']', [
111
            'category' => get_class($this),
112
        ]);
113
114
        pcntl_waitpid($pid['pid'], $status, WNOHANG | WUNTRACED);
115
        $this->cleanup(SIGTERM);
116
    }
117
118
    /**
119
     * Cleanup.
120
     */
121
    public function cleanup(int $sig): void
122
    {
123
        if (null !== $this->manager_pid) {
124
            $this->logger->debug('received exit signal ['.$sig.'], forward signal to the fork manager ['.$sig.']', [
125
                'category' => get_class($this),
126
            ]);
127
128
            posix_kill($this->manager_pid, $sig);
129
        }
130
131
        $this->exit();
132
    }
133
134
    /**
135
     * Fork a worker manager.
136
     */
137 1
    protected function initWorkerManager()
138
    {
139 1
        $pid = pcntl_fork();
140 1
        $this->manager_pid = $pid;
141
142 1
        if (-1 === $pid) {
143
            throw new SpawnForkException('failed to spawn fork manager');
144
        }
145
146 1
        if (!$pid) {
147
            $manager = $this->factory->buildManager();
148
            $manager->process();
149
            exit();
0 ignored issues
show
Best Practice introduced by
Using exit here is not recommended.

In general, usage of exit should be done with care and only when running in a scripting context like a CLI script.

Loading history...
150
        }
151 1
    }
152
153
    /**
154
     * Fork handling, blocking process.
155
     */
156 1
    protected function main(): void
157
    {
158 1
        $this->logger->info('start job listener', [
159 1
            'category' => get_class($this),
160
        ]);
161
162 1
        $cursor_jobs = $this->jobs->getCursor([
163 1
            '$or' => [
164
                ['status' => JobInterface::STATUS_WAITING],
165
                ['status' => JobInterface::STATUS_POSTPONED],
166
            ],
167
        ]);
168
169 1
        $cursor_events = $this->events->getCursor([
170 1
            'timestamp' => ['$gte' => new UTCDateTime()],
171
            'job' => ['$exists' => true],
172
            'status' => ['$gt' => JobInterface::STATUS_POSTPONED],
173
        ]);
174
175 1
        $this->catchSignal();
176
177 1
        while ($this->loop()) {
178 1
            if (null === $cursor_events->current()) {
179 1
                if ($cursor_events->getInnerIterator()->isDead()) {
0 ignored issues
show
Bug introduced by
The method isDead() does not exist on Iterator. It seems like you code against a sub-type of Iterator such as Helmich\MongoMock\MockCursor or Nette\Iterators\CachingIterator. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

179
                if ($cursor_events->getInnerIterator()->/** @scrutinizer ignore-call */ isDead()) {
Loading history...
180
                    $this->logger->error('event queue cursor is dead, is it a capped collection?', [
181
                        'category' => get_class($this),
182
                    ]);
183
184
                    $this->events->create();
185
186
                    $this->main();
187
188
                    break;
189
                }
190
191 1
                $this->events->next($cursor_events, function () {
192
                    $this->main();
193 1
                });
194
            }
195
196 1
            $event = $cursor_events->current();
197 1
            $this->events->next($cursor_events, function () {
198
                $this->main();
199 1
            });
200
201 1
            if($event !== null) {
202
                $this->handleEvent($event);
203
            }
204
205 1
            if (null === $cursor_jobs->current()) {
206 1
                if ($cursor_jobs->getInnerIterator()->isDead()) {
207
                    $this->logger->error('job queue cursor is dead, is it a capped collection?', [
208
                        'category' => get_class($this),
209
                    ]);
210
211
                    $this->jobs->create();
212
                    $this->main();
213
214
                    break;
215
                }
216
217 1
                $this->jobs->next($cursor_jobs, function () {
218
                    $this->main();
219 1
                });
220
221 1
                continue;
222
            }
223
224
            $job = $cursor_jobs->current();
225
226
            $this->jobs->next($cursor_jobs, function () {
227
                $this->main();
228
            });
229
230
            $this->handleJob($job);
231
        }
232 1
    }
233
234
    /**
235
     * Handle events.
236
     */
237
    protected function handleEvent(array $event): self
238
    {
239
        $this->logger->debug('received event ['.$event['status'].'] for job ['.$event['job'].'], write into systemv queue', [
240
            'category' => get_class($this),
241
        ]);
242
243
        msg_send($this->queue, WorkerManager::TYPE_EVENT, $event);
244
245
        return $this;
246
    }
247
248
    /**
249
     * Handle job.
250
     */
251
    protected function handleJob(array $job): self
252
    {
253
        $this->logger->debug('received job ['.$job['_id'].'], write in systemv message queue', [
254
            'category' => get_class($this),
255
        ]);
256
257
        msg_send($this->queue, WorkerManager::TYPE_JOB, [
258
            '_id' => $job['_id'],
259
            'options' => $job['options'],
260
        ]);
261
262
        return $this;
263
    }
264
265
    /**
266
     * Catch signals and cleanup.
267
     */
268 1
    protected function catchSignal(): self
269
    {
270 1
        pcntl_async_signals(true);
271 1
        pcntl_signal(SIGTERM, [$this, 'cleanup']);
272 1
        pcntl_signal(SIGINT, [$this, 'cleanup']);
273 1
        pcntl_signal(SIGCHLD, [$this, 'exitWorkerManager']);
274
275 1
        return $this;
276
    }
277
}
278