AsyncCall::setProcessLimit()   A
last analyzed

Complexity

Conditions 2
Paths 2

Size

Total Lines 6
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 3
c 0
b 0
f 0
nc 2
nop 1
dl 0
loc 6
rs 10
1
<?php
2
declare(strict_types=1);
3
4
namespace Async;
5
6
use SuperClosure\Serializer;
7
use Symfony\Component\Console\Exception\InvalidArgumentException;
8
9
class AsyncCall
10
{
11
    private const CONSOLE_LOCATION = __DIR__ . '/../../bin/console';
12
13
    /**
14
     * @var bool
15
     */
16
    private static $shutdownFunctionRegistered = false;
17
    /**
18
     * @var AsyncProcess[]
19
     */
20
    private static $processList = [];
21
    /**
22
     * @var Serializer
23
     */
24
    private static $serializer;
25
    /**
26
     * @var int
27
     */
28
    private static $processesLimit = 0;
29
    private static $processAmount = 0;
30
31
    public static function setProcessLimit(int $processesLimit): void
32
    {
33
        if ($processesLimit < 0) {
34
            throw new InvalidArgumentException('Processes limit Must be positive integer');
35
        }
36
        self::$processesLimit = $processesLimit;
37
    }
38
39
    public static function run(
40
        callable $job,
41
        callable $callback = null,
42
        callable $onError = null,
43
        float $timeout = null,
44
        float $idleTimeout = null
45
    ): void {
46
        self::registerShutdownFunction();
47
48
        if (!self::$serializer) {
49
            self::$serializer = new Serializer();
50
        }
51
52
        // we got process limit so wait for them to finish
53
        if (0 !== self::$processesLimit && self::$processesLimit >= self::$processAmount) {
54
            self::waitForProcessesToFinish(self::$processesLimit);
55
        }
56
57
        $process = new AsyncProcess(
58
            [
59
                self::CONSOLE_LOCATION,
60
                AsyncChildCommand::COMMAND_NAME,
61
                base64_encode(self::$serializer->serialize($job))
62
            ]
63
        );
64
        $process->setTimeout($timeout);
65
        $process->setIdleTimeout($idleTimeout);
66
        $process->startJob($callback, $onError);
67
68
        self::$processList[] = $process;
69
        self::$processAmount++;
70
    }
71
72
    private static function registerShutdownFunction(): void
73
    {
74
        if (!self::$shutdownFunctionRegistered) {
75
            register_shutdown_function(
76
                static function () {
77
                    self::waitForProcessesToFinish();
78
                }
79
            );
80
            self::$shutdownFunctionRegistered = true;
81
        }
82
    }
83
84
    private static function waitForProcessesToFinish(int $maxProcessToWait = 0): void
85
    {
86
        for (; ;) {
87
            if (0 === self::$processAmount || $maxProcessToWait > self::$processAmount) {
88
                break;
89
            }
90
91
            foreach (self::$processList as $i => $process) {
92
                if (
93
                    $process->getStatus() === AsyncProcess::STATUS_TERMINATED ||
94
                    (!$process->hasCallbackSet() && !$process->hasOnErrorSet())
95
                ) {
96
                    unset(self::$processList[$i]);
97
                    self::$processAmount--;
98
99
                    continue;
100
                }
101
            }
102
        }
103
    }
104
}