Completed
Branch dev (11269e)
by Raffael
04:12
created

Queue::queueJob()   A

Complexity

Conditions 3
Paths 3

Size

Total Lines 13
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 4.125

Importance

Changes 0
Metric Value
eloc 7
dl 0
loc 13
c 0
b 0
f 0
ccs 4
cts 8
cp 0.5
rs 10
cc 3
nc 3
nop 1
crap 4.125

1 Method

Rating   Name   Duplication   Size   Complexity  
A Queue::catchSignal() 0 8 1
1
<?php
2
3
declare(strict_types=1);
4
5
/**
6
 * TaskScheduler
7
 *
8
 * @author      Raffael Sahli <[email protected]>
9
 * @copyright   Copryright (c) 2017-2018 gyselroth GmbH (https://gyselroth.com)
10
 * @license     MIT https://opensource.org/licenses/MIT
11
 */
12
13
namespace TaskScheduler;
14
15
use InvalidArgumentException;
16
use MongoDB\Database;
17
use Psr\Container\ContainerInterface;
18
use Psr\Log\LoggerInterface;
19
20
class Queue extends AbstractQueue
21
{
22
    /**
23
     * Process identifier.
24
     */
25
    public const MAIN_PROCESS = 'main';
26
27
    /**
28
     * Queue options.
29
     */
30
    public const OPTION_PM = 'pm';
31
    public const OPTION_MAX_CHILDREN = 'max_children';
32
    public const OPTION_MIN_CHILDREN = 'min_children';
33
34
    /**
35
     * Process handling.
36
     */
37
    public const PM_DYNAMIC = 'dynamic';
38
    public const PM_STATIC = 'static';
39
    public const PM_ONDEMAND = 'ondemand';
40
41
    /**
42
     * Collection name.
43
     *
44
     * @var string
45
     */
46
    protected $collection_name = 'queue';
47
48
    /**
49
     * Process management.
50
     *
51
     * @var string
52
     */
53
    protected $pm = self::PM_DYNAMIC;
54
55
    /**
56
     * Max children.
57
     *
58
     * @var int
59
     */
60
    protected $max_children = 2;
61
62
    /**
63
     * Min children.
64
     *
65
     * @var int
66
     */
67
    protected $min_children = 1;
68
69
    /**
70
     * Forks.
71
     *
72
     * @var array
73
     */
74
    protected $forks = [];
75
76
    /**
77
     * Worker factory.
78
     *
79
     * @var WorkerFactoryInterface
80
     */
81
    protected $factory;
82
83
    /**
84
     * Init queue.
85
     *
86
     * @param Scheduler              $scheduler
87
     * @param Database               $db
88
     * @param WorkerFactoryInterface $factory
89
     * @param LoggerInterface        $logger
90
     * @param ContainerInterface     $container
91
     * @param array                  $options
92
     */
93 17
    public function __construct(Scheduler $scheduler, Database $db, WorkerFactoryInterface $factory, LoggerInterface $logger, ?ContainerInterface $container = null, array $config = [])
94
    {
95 17
        $this->scheduler = $scheduler;
96 17
        $this->db = $db;
97 17
        $this->logger = $logger;
98 17
        $this->container = $container;
99 17
        $this->collection_name = $scheduler->getCollection();
100 17
        $this->setOptions($config);
101 17
        $this->process = self::MAIN_PROCESS;
102 17
        $this->factory = $factory;
103 17
    }
104
105
    /**
106
     * Set options.
107
     *
108
     * @param array $config
109
     *
110
     * @return Queue
111
     */
112 17
    public function setOptions(array $config = []): self
113
    {
114 17
        foreach ($config as $option => $value) {
115
            switch ($option) {
116 9
                case self::OPTION_MAX_CHILDREN:
117 8
                case self::OPTION_MIN_CHILDREN:
118 6
                    if (!is_int($value)) {
119 2
                        throw new InvalidArgumentException($option.' needs to be an integer');
120
                    }
121
122 4
                    $this->{$option} = $value;
123
124 4
                break;
125 4
                case self::OPTION_PM:
126 3
                    if (!defined('self::PM_'.strtoupper($value))) {
127 1
                        throw new InvalidArgumentException($value.' is not a valid process handling type (static, dynamic, ondemand)');
128
                    }
129
130 2
                    $this->{$option} = $value;
131
132 2
                break;
133
                default:
134 6
                    throw new InvalidArgumentException('invalid option '.$option.' given');
135
            }
136
        }
137
138 17
        if ($this->min_children > $this->max_children) {
139 1
            throw new InvalidArgumentException('option min_children must not be greater than option max_children');
140
        }
141
142 17
        return $this;
143
    }
144
145
    /**
146
     * Startup (blocking process).
147
     */
148
    public function process(): void
149
    {
150
        try {
151
            $this->startInitialWorkers();
152
            $this->main();
153
        } catch (\Exception $e) {
154
            $this->cleanup(SIGTERM);
155
        }
156
    }
157
158
    /**
159
     * Cleanup and exit.
160
     *
161
     * @param int $sig
162
     */
163
    public function cleanup(int $sig)
164
    {
165
        $this->handleSignal($sig);
166
        exit();
0 ignored issues
show
Best Practice introduced by
Using exit here is not recommended.

In general, usage of exit should be done with care and only when running in a scripting context like a CLI script.

Loading history...
167
    }
168
169
    /**
170
     * Wait for child and terminate.
171
     *
172
     * @param int   $sig
173
     * @param array $pid
174
     *
175
     * @return Queue
176
     */
177 2
    public function exitChild(int $sig, array $pid): self
178
    {
179 2
        pcntl_waitpid($pid['pid'], $status, WNOHANG | WUNTRACED);
180
181 2
        if (isset($this->forks[$pid['pid']])) {
182
            unset($this->forks[$pid['pid']]);
183
        }
184
185 2
        return $this;
186
    }
187
188
    /**
189
     * Start initial workers.
190
     */
191 5
    protected function startInitialWorkers()
192
    {
193 5
        $this->logger->debug('start initial ['.$this->min_children.'] child processes', [
194 5
            'category' => get_class($this),
195 5
            'pm' => $this->process,
196
        ]);
197
198 5
        if (self::PM_DYNAMIC === $this->pm || self::PM_STATIC === $this->pm) {
199 4
            for ($i = 0; $i < $this->min_children; ++$i) {
200 4
                $this->startWorker();
201
            }
202
        }
203 5
    }
204
205
    /**
206
     * Start worker.
207
     *
208
     * @see https://github.com/mongodb/mongo-php-driver/issues/828
209
     * @see https://github.com/mongodb/mongo-php-driver/issues/174
210
     *
211
     * @param array $job
212
     *
213
     * @return int
214
     */
215 4
    protected function startWorker(?array $job = null)
0 ignored issues
show
Unused Code introduced by
The parameter $job is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

215
    protected function startWorker(/** @scrutinizer ignore-unused */ ?array $job = null)

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
216
    {
217 4
        $pid = pcntl_fork();
218 4
        $this->forks[] = $pid;
219
220 4
        if (-1 === $pid) {
221
            throw new Exception\Runtime('failed to start new worker');
222
        }
223 4
        if (!$pid) {
224
            $worker = $this->factory->build()->start();
0 ignored issues
show
Bug introduced by
Are you sure the assignment to $worker is correct as $this->factory->build()->start() targeting TaskScheduler\Worker::start() seems to always return null.

This check looks for function or method calls that always return null and whose return value is assigned to a variable.

class A
{
    function getObject()
    {
        return null;
    }

}

$a = new A();
$object = $a->getObject();

The method getObject() can return nothing but null, so it makes no sense to assign that value to a variable.

The reason is most likely that a function or method is imcomplete or has been reduced for debug purposes.

Loading history...
Unused Code introduced by
The assignment to $worker is dead and can be removed.
Loading history...
225
            exit();
0 ignored issues
show
Best Practice introduced by
Using exit here is not recommended.

In general, usage of exit should be done with care and only when running in a scripting context like a CLI script.

Loading history...
226
        }
227
228 4
        $this->logger->debug('start worker process ['.$pid.']', [
229 4
            'category' => get_class($this),
230 4
            'pm' => $this->process,
231
        ]);
232
233 4
        return $pid;
234
    }
235
236
    /**
237
     * Get forks (array of pid's).
238
     *
239
     * @return int[]
240
     */
241 5
    protected function getForks(): array
242
    {
243 5
        return $this->forks;
244
    }
245
246
    /**
247
     * Fork handling, blocking process.
248
     */
249
    protected function main()
250
    {
251
        $cursor = $this->getCursor();
252
        $this->catchSignal();
253
254
        while (true) {
255
            if (null === $cursor->current()) {
256
                if ($cursor->getInnerIterator()->isDead()) {
0 ignored issues
show
Bug introduced by
The method isDead() does not exist on Iterator. It seems like you code against a sub-type of Iterator such as Helmich\MongoMock\MockCursor or Nette\Iterators\CachingIterator. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

256
                if ($cursor->getInnerIterator()->/** @scrutinizer ignore-call */ isDead()) {
Loading history...
257
                    $this->logger->error('job queue cursor is dead, is it a capped collection?', [
258
                        'category' => get_class($this),
259
                        'pm' => $this->process,
260
                    ]);
261
262
                    $this->createQueue();
263
264
                    return $this->main();
0 ignored issues
show
Bug introduced by
Are you sure the usage of $this->main() targeting TaskScheduler\Queue::main() seems to always return null.

This check looks for function or method calls that always return null and whose return value is used.

class A
{
    function getObject()
    {
        return null;
    }

}

$a = new A();
if ($a->getObject()) {

The method getObject() can return nothing but null, so it makes no sense to use the return value.

The reason is most likely that a function or method is imcomplete or has been reduced for debug purposes.

Loading history...
265
                }
266
267
                $this->retrieveNextJob($cursor);
268
269
                continue;
270
            }
271
272
            $job = $cursor->current();
273
            $this->retrieveNextJob($cursor);
274
275
            if ($this->manager->getProcessCount() < $this->max_children && self::PM_STATIC !== $this->pm) {
0 ignored issues
show
Bug Best Practice introduced by
The property manager does not exist on TaskScheduler\Queue. Did you maybe forget to declare it?
Loading history...
276
                $this->logger->debug('max_children ['.$this->max_children.'] processes not reached ['.$this->manager->getProcessCount().'], start new worker', [
277
                    'category' => get_class($this),
278
                    'pm' => $this->process,
279
                ]);
280
281
                $this->startWorker();
282
            } elseif (isset($job[Scheduler::OPTION_IGNORE_MAX_CHILDREN]) && true === $job[Scheduler::OPTION_IGNORE_MAX_CHILDREN]) {
283
                $this->logger->debug('job ['.$job['_id'].'] deployed with ignore_max_children, start new worker', [
284
                    'category' => get_class($this),
285
                    'pm' => $this->process,
286
                ]);
287
288
                $this->startWorker($job);
289
            } else {
290
                $this->logger->debug('max children ['.$this->max_children.'] reached for job ['.$job['_id'].'], do not start new worker', [
291
                    'category' => get_class($this),
292
                    'pm' => $this->process,
293
                ]);
294
            }
295
        }
296
    }
297
298
    /**
299
     * Catch signals and cleanup.
300
     *
301
     * @return Queue
302
     */
303 5
    protected function catchSignal(): self
304
    {
305 5
        pcntl_async_signals(true);
306 5
        pcntl_signal(SIGTERM, [$this, 'cleanup']);
307 5
        pcntl_signal(SIGINT, [$this, 'cleanup']);
308 5
        pcntl_signal(SIGCHLD, [$this, 'exitChild']);
309
310 5
        return $this;
311
    }
312
313
    /**
314
     * Cleanup.
315
     *
316
     * @param int $sig
317
     */
318 1
    protected function handleSignal(int $sig): void
319
    {
320 1
        $this->logger->debug('received signal ['.$sig.']', [
321 1
            'category' => get_class($this),
322 1
            'pm' => $this->process,
323
        ]);
324
325 1
        foreach ($this->getForks() as $key => $pid) {
326 1
            $this->logger->debug('forward signal ['.$sig.'] to child process ['.$pid.']', [
327 1
                'category' => get_class($this),
328 1
                'pm' => $this->process,
329
            ]);
330
331 1
            posix_kill($pid, $sig);
332
        }
333 1
    }
334
}
335