Completed
Pull Request — master (#7)
by Raffael
04:14
created

Queue::processOnce()   A

Complexity

Conditions 4
Paths 4

Size

Total Lines 22
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 10
CRAP Score 4.1967

Importance

Changes 0
Metric Value
eloc 12
dl 0
loc 22
c 0
b 0
f 0
ccs 10
cts 13
cp 0.7692
rs 9.8666
cc 4
nc 4
nop 0
crap 4.1967
1
<?php
2
3
declare(strict_types=1);
4
5
/**
6
 * TaskScheduler
7
 *
8
 * @author      Raffael Sahli <[email protected]>
9
 * @copyright   Copryright (c) 2017-2018 gyselroth GmbH (https://gyselroth.com)
10
 * @license     MIT https://opensource.org/licenses/MIT
11
 */
12
13
namespace TaskScheduler;
14
15
use MongoDB\BSON\UTCDateTime;
16
use MongoDB\Database;
17
use Psr\Log\LoggerInterface;
18
use TaskScheduler\Exception\SpawnForkException;
19
20
class Queue
21
{
22
    use InjectTrait;
23
24
    /**
25
     * Database.
26
     *
27
     * @var Database
28
     */
29
    protected $db;
30
31
    /**
32
     * Logger.
33
     *
34
     * @var LoggerInterface
35
     */
36
    protected $logger;
37
38
    /**
39
     * Worker factory.
40
     *
41
     * @var WorkerFactoryInterface
42
     */
43
    protected $factory;
44
45
    /**
46
     * Jobs queue.
47
     *
48
     * @var MessageQueue
49
     */
50
    protected $jobs;
51
52
    /**
53
     * Events queue.
54
     *
55
     * @var MessageQueue
56
     */
57
    protected $events;
58
59
    /**
60
     * Worker manager pid.
61
     *
62
     * @var int
63
     */
64
    protected $manager_pid;
65
66
    /**
67
     * Init queue.
68
     */
69 1
    public function __construct(Scheduler $scheduler, Database $db, WorkerFactoryInterface $factory, LoggerInterface $logger)
70
    {
71 1
        $this->db = $db;
72 1
        $this->logger = $logger;
73 1
        $this->factory = $factory;
74 1
        $this->jobs = new MessageQueue($db, $scheduler->getJobQueue(), $scheduler->getJobQueueSize(), $logger);
75 1
        $this->events = new MessageQueue($db, $scheduler->getEventQueue(), $scheduler->getEventQueueSize(), $logger);
76 1
    }
77
78
    /**
79
     * Startup (blocking process).
80
     */
81 1
    public function process(): void
82
    {
83
        try {
84 1
            $this->queue = msg_get_queue(ftok(__FILE__, 't'));
0 ignored issues
show
Bug Best Practice introduced by
The property queue does not exist. Although not strictly required by PHP, it is generally a best practice to declare properties explicitly.
Loading history...
85 1
            $this->catchSignal();
86 1
            $this->initWorkerManager();
87 1
            $this->main();
88
        } catch (\Exception $e) {
89
            $this->logger->error('main() throw an exception, cleanup and exit', [
90
                'class' => get_class($this),
91
                'exception' => $e,
92
            ]);
93
94
            $this->cleanup(SIGTERM);
95
        }
96 1
    }
97
98
    /**
99
     * Wait for worker manager.
100
     */
101 1
    public function exitWorkerManager(int $sig, array $pid): void
102
    {
103 1
        $this->logger->debug('fork manager ['.$pid['pid'].'] exit with ['.$sig.']', [
104 1
            'category' => get_class($this),
105
        ]);
106
107 1
        pcntl_waitpid($pid['pid'], $status, WNOHANG | WUNTRACED);
108 1
        $this->cleanup(SIGTERM);
109 1
    }
110
111
    /**
112
     * Cleanup.
113
     */
114 1
    public function cleanup(int $sig): void
115
    {
116 1
        if (null !== $this->manager_pid) {
117 1
            $this->logger->debug('received exit signal ['.$sig.'], forward signal to the fork manager ['.$sig.']', [
118 1
                'category' => get_class($this),
119
            ]);
120
121 1
            posix_kill($this->manager_pid, $sig);
122
        }
123
124 1
        $this->exit();
125 1
    }
126
127
    /**
128
     * Fork a worker manager.
129
     */
130 1
    protected function initWorkerManager()
131
    {
132 1
        $pid = pcntl_fork();
133 1
        $this->manager_pid = $pid;
134
135 1
        if (-1 === $pid) {
136
            throw new SpawnForkException('failed to spawn fork manager');
137
        }
138
139 1
        if (!$pid) {
140
            $manager = $this->factory->buildManager();
141
            $manager->process();
142
            exit();
0 ignored issues
show
Best Practice introduced by
Using exit here is not recommended.

In general, usage of exit should be done with care and only when running in a scripting context like a CLI script.

Loading history...
143
        }
144 1
    }
145
146
    /**
147
     * Fork handling, blocking process.
148
     */
149 1
    protected function main(): void
150
    {
151 1
        $this->logger->info('start job listener', [
152 1
            'category' => get_class($this),
153
        ]);
154
155 1
        $cursor_jobs = $this->jobs->getCursor([
156 1
            '$or' => [
157
                ['status' => JobInterface::STATUS_WAITING],
158
                ['status' => JobInterface::STATUS_POSTPONED],
159
            ],
160
        ]);
161
162 1
        $cursor_events = $this->events->getCursor([
163 1
            'timestamp' => ['$gte' => new UTCDateTime()],
164
            'job' => ['$exists' => true],
165
            'status' => ['$gt' => JobInterface::STATUS_POSTPONED],
166
        ]);
167
168 1
        $this->catchSignal();
169
170 1
        while ($this->loop()) {
171 1
            if (null === $cursor_events->current()) {
172 1
                if ($cursor_events->getInnerIterator()->isDead()) {
0 ignored issues
show
Bug introduced by
The method isDead() does not exist on Iterator. It seems like you code against a sub-type of Iterator such as Helmich\MongoMock\MockCursor or Nette\Iterators\CachingIterator. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

172
                if ($cursor_events->getInnerIterator()->/** @scrutinizer ignore-call */ isDead()) {
Loading history...
173
                    $this->logger->error('event queue cursor is dead, is it a capped collection?', [
174
                        'category' => get_class($this),
175
                    ]);
176
177
                    $this->events->create();
178
179
                    $this->main();
180
181
                    break;
182
                }
183
184 1
                $this->events->next($cursor_events, function () {
185
                    $this->main();
186 1
                });
187
            }
188
189 1
            $event = $cursor_events->current();
190 1
            $this->events->next($cursor_events, function () {
191
                $this->main();
192 1
            });
193
194 1
            if($event !== null) {
195
                $this->handleEvent($event);
196
            }
197
198 1
            if (null === $cursor_jobs->current()) {
199 1
                if ($cursor_jobs->getInnerIterator()->isDead()) {
200
                    $this->logger->error('job queue cursor is dead, is it a capped collection?', [
201
                        'category' => get_class($this),
202
                    ]);
203
204
                    $this->jobs->create();
205
                    $this->main();
206
207
                    break;
208
                }
209
210 1
                $this->jobs->next($cursor_jobs, function () {
211
                    $this->main();
212 1
                });
213
214 1
                continue;
215
            }
216
217
            $job = $cursor_jobs->current();
218
219
            $this->jobs->next($cursor_jobs, function () {
220
                $this->main();
221
            });
222
223
            $this->handleJob($job);
224
        }
225 1
    }
226
227
    /**
228
     * Handle events.
229
     */
230
    protected function handleEvent(array $event): self
231
    {
232
        $this->logger->debug('received event ['.$event['status'].'] for job ['.$event['job'].'], write into systemv queue', [
233
            'category' => get_class($this),
234
        ]);
235
236
        msg_send($this->queue, WorkerManager::TYPE_EVENT, $event);
237
238
        return $this;
239
    }
240
241
    /**
242
     * Handle job.
243
     */
244
    protected function handleJob(array $job): self
245
    {
246
        $this->logger->debug('received job ['.$job['_id'].'], write in systemv message queue', [
247
            'category' => get_class($this),
248
        ]);
249
250
        msg_send($this->queue, WorkerManager::TYPE_JOB, [
251
            '_id' => $job['_id'],
252
            'options' => $job['options'],
253
        ]);
254
255
        return $this;
256
    }
257
258
    /**
259
     * Catch signals and cleanup.
260
     */
261 1
    protected function catchSignal(): self
262
    {
263 1
        pcntl_async_signals(true);
264 1
        pcntl_signal(SIGTERM, [$this, 'cleanup']);
265 1
        pcntl_signal(SIGINT, [$this, 'cleanup']);
266 1
        pcntl_signal(SIGCHLD, [$this, 'exitWorkerManager']);
267
268 1
        return $this;
269
    }
270
}
271