Queue::exitWorkerManager()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 8
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
eloc 4
dl 0
loc 8
ccs 0
cts 5
cp 0
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 2
crap 2
1
<?php
2
3
declare(strict_types=1);
4
5
/**
6
 * TaskScheduler
7
 *
8
 * @author      Raffael Sahli <[email protected]>
9
 * @copyright   Copryright (c) 2017-2019 gyselroth GmbH (https://gyselroth.com)
10
 * @license     MIT https://opensource.org/licenses/MIT
11
 */
12
13
namespace TaskScheduler;
14
15
use MongoDB\BSON\UTCDateTime;
16
use MongoDB\Database;
17
use Psr\Log\LoggerInterface;
18
use TaskScheduler\Exception\SpawnForkException;
19
20
class Queue
21
{
22
    use InjectTrait;
23
24
    /**
25
     * Database.
26
     *
27
     * @var Database
28
     */
29
    protected $db;
30
31
    /**
32
     * Logger.
33
     *
34
     * @var LoggerInterface
35
     */
36
    protected $logger;
37
38
    /**
39
     * Worker factory.
40
     *
41
     * @var WorkerFactoryInterface
42
     */
43
    protected $factory;
44
45
    /**
46
     * Jobs queue.
47
     *
48
     * @var MessageQueue
49
     */
50
    protected $jobs;
51
52
    /**
53
     * Events queue.
54
     *
55
     * @var MessageQueue
56
     */
57
    protected $events;
58
59
    /**
60
     * Worker manager pid.
61
     *
62
     * @var int
63
     */
64
    protected $manager_pid;
65
66
    /**
67
     * Sysmfsg queue.
68
     *
69
     * @var resource
70
     */
71
    protected $queue;
72
73
    /**
74
     * Init queue.
75
     */
76 1
    public function __construct(Scheduler $scheduler, Database $db, WorkerFactoryInterface $factory, LoggerInterface $logger)
77
    {
78 1
        $this->db = $db;
79 1
        $this->logger = $logger;
80 1
        $this->factory = $factory;
81 1
        $this->jobs = new MessageQueue($db, $scheduler->getJobQueue(), $scheduler->getJobQueueSize(), $logger);
82 1
        $this->events = new MessageQueue($db, $scheduler->getEventQueue(), $scheduler->getEventQueueSize(), $logger);
83 1
    }
84
85
    /**
86
     * Startup (blocking process).
87
     */
88 1
    public function process(): void
89
    {
90
        try {
91 1
            $this->queue = msg_get_queue(ftok(__FILE__, 't'));
92 1
            $this->catchSignal();
93 1
            $this->initWorkerManager();
94 1
            $this->main();
95
        } catch (\Exception $e) {
96
            $this->logger->error('main() throw an exception, cleanup and exit', [
97
                'class' => get_class($this),
98
                'exception' => $e,
99
            ]);
100
101
            $this->cleanup(SIGTERM);
102
        }
103 1
    }
104
105
    /**
106
     * Wait for worker manager.
107
     */
108
    public function exitWorkerManager(int $sig, array $pid): void
109
    {
110
        $this->logger->debug('fork manager ['.$pid['pid'].'] exit with ['.$sig.']', [
111
            'category' => get_class($this),
112
        ]);
113
114
        pcntl_waitpid($pid['pid'], $status, WNOHANG | WUNTRACED);
115
        $this->cleanup(SIGTERM);
116
    }
117
118
    /**
119
     * Cleanup.
120
     */
121
    public function cleanup(int $sig): void
122
    {
123
        if (null !== $this->manager_pid) {
124
            $this->logger->debug('received exit signal ['.$sig.'], forward signal to the fork manager ['.$sig.']', [
125
                'category' => get_class($this),
126
            ]);
127
128
            posix_kill($this->manager_pid, $sig);
129
        }
130
131
        $this->exit();
132
    }
133
134
    /**
135
     * Fork a worker manager.
136
     */
137 1
    protected function initWorkerManager()
138
    {
139 1
        $pid = pcntl_fork();
140 1
        $this->manager_pid = $pid;
141
142 1
        if (-1 === $pid) {
143
            throw new SpawnForkException('failed to spawn fork manager');
144
        }
145
146 1
        if (!$pid) {
147
            $manager = $this->factory->buildManager();
148
            $manager->process();
149
            exit();
0 ignored issues
show
Best Practice introduced by
Using exit here is not recommended.

In general, usage of exit should be done with care and only when running in a scripting context like a CLI script.

Loading history...
150
        }
151 1
    }
152
153
    /**
154
     * Fork handling, blocking process.
155
     */
156 1
    protected function main(): void
157
    {
158 1
        $this->logger->info('start job listener', [
159 1
            'category' => get_class($this),
160
        ]);
161
162 1
        $cursor_jobs = $this->jobs->getCursor([
163 1
            '$or' => [
164
                ['status' => JobInterface::STATUS_WAITING],
165
                ['status' => JobInterface::STATUS_POSTPONED],
166
            ],
167
        ]);
168
169 1
        $cursor_events = $this->events->getCursor([
170 1
            'timestamp' => ['$gte' => new UTCDateTime()],
171
            'job' => ['$exists' => true],
172
            'status' => ['$gt' => JobInterface::STATUS_POSTPONED],
173
        ]);
174
175 1
        $this->catchSignal();
176
177 1
        while ($this->loop()) {
178 1
            while ($this->loop()) {
179
                if (null === $cursor_events->current()) {
180
                    if ($cursor_events->getInnerIterator()->isDead()) {
0 ignored issues
show
Bug introduced by
The method isDead() does not exist on Iterator. It seems like you code against a sub-type of Iterator such as Helmich\MongoMock\MockCursor or Nette\Iterators\CachingIterator. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

180
                    if ($cursor_events->getInnerIterator()->/** @scrutinizer ignore-call */ isDead()) {
Loading history...
181
                        $this->logger->error('event queue cursor is dead, is it a capped collection?', [
182
                            'category' => get_class($this),
183
                        ]);
184
185
                        $this->events->create();
186
187
                        $this->main();
188
189
                        break;
190
                    }
191
192
                    $this->events->next($cursor_events, function () {
193
                        $this->main();
194
                    });
195
                }
196
197
                $event = $cursor_events->current();
198
                $this->events->next($cursor_events, function () {
199
                    $this->main();
200
                });
201
202
                if($event === null) {
203
                    break;
204
                }
205
206
                $this->handleEvent($event);
207
            }
208
209 1
            if (null === $cursor_jobs->current()) {
210 1
                if ($cursor_jobs->getInnerIterator()->isDead()) {
211
                    $this->logger->error('job queue cursor is dead, is it a capped collection?', [
212
                        'category' => get_class($this),
213
                    ]);
214
215
                    $this->jobs->create();
216
                    $this->main();
217
218
                    break;
219
                }
220
221 1
                $this->jobs->next($cursor_jobs, function () {
222
                    $this->main();
223 1
                });
224
225 1
                continue;
226
            }
227
228
            $job = $cursor_jobs->current();
229
230
            $this->jobs->next($cursor_jobs, function () {
231
                $this->main();
232
            });
233
234
            $this->handleJob($job);
235
        }
236 1
    }
237
238
    /**
239
     * Handle events.
240
     */
241
    protected function handleEvent(array $event): self
242
    {
243
        $this->logger->debug('received event ['.$event['status'].'] for job ['.$event['job'].'], write into systemv queue', [
244
            'category' => get_class($this),
245
        ]);
246
247
        msg_send($this->queue, WorkerManager::TYPE_EVENT, $event);
248
249
        return $this;
250
    }
251
252
    /**
253
     * Handle job.
254
     */
255
    protected function handleJob(array $job): self
256
    {
257
        $this->logger->debug('received job ['.$job['_id'].'], write in systemv message queue', [
258
            'category' => get_class($this),
259
        ]);
260
261
        msg_send($this->queue, WorkerManager::TYPE_JOB, [
262
            '_id' => $job['_id'],
263
            'options' => $job['options'],
264
        ]);
265
266
        return $this;
267
    }
268
269
    /**
270
     * Catch signals and cleanup.
271
     */
272 1
    protected function catchSignal(): self
273
    {
274 1
        pcntl_async_signals(true);
275 1
        pcntl_signal(SIGTERM, [$this, 'cleanup']);
276 1
        pcntl_signal(SIGINT, [$this, 'cleanup']);
277 1
        pcntl_signal(SIGCHLD, [$this, 'exitWorkerManager']);
278
279 1
        return $this;
280
    }
281
}
282