Async::progress()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 9
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 2
Bugs 0 Features 0
Metric Value
cc 1
eloc 4
nc 1
nop 1
dl 0
loc 9
rs 10
c 2
b 0
f 0
1
<?php
2
3
namespace Tleckie\Async;
4
5
/**
6
 * Class Async
7
 *
8
 * @package Tleckie\Async
9
 * @author  Teodoro Leckie Westberg <[email protected]>
10
 */
11
class Async
12
{
13
    /** @var TaskInterface[] */
14
    protected array $pendingQueue = [];
15
16
    /** @var TaskInterface[] */
17
    protected array $progressQueue = [];
18
19
    /** @var TaskInterface[] */
20
    protected array $finishedQueue = [];
21
22
    /** @var TaskInterface[] */
23
    protected array $failedQueue = [];
24
25
    /** @var TaskFactoryInterface */
26
    protected TaskFactoryInterface $taskFactory;
27
28
    /** @var Encoder */
29
    protected Encoder $encoder;
30
31
    /** @var mixed[] */
32
    protected array $results = [];
33
34
    /** @var int */
35
    protected int $sleep;
36
37
    /**
38
     * Async constructor.
39
     *
40
     * @param TaskFactoryInterface|null $taskFactory
41
     * @param Encoder|null              $encoder
42
     * @param int|null                  $sleep
43
     */
44
    public function __construct(
45
        ?TaskFactoryInterface $taskFactory = null,
46
        ?Encoder $encoder = null,
47
        ?int $sleep = 5000
48
    ) {
49
        $this->taskFactory = $taskFactory ?? new TaskFactory();
50
        $this->encoder = $encoder ?? new Encoder();
51
        $this->sleep = $sleep;
52
53
        $this->listener();
54
    }
55
56
    protected function listener(): void
57
    {
58
        pcntl_async_signals(true);
59
        pcntl_signal(SIGCHLD, function ($signo, $status) {
60
            while (true) {
61
                $pid = pcntl_waitpid(-1, $processState, WNOHANG | WUNTRACED);
62
                if ($pid <= 0) {
63
                    break;
64
                }
65
66
                $process = $this->progressQueue[$pid] ?? null;
67
68
                if (!$process || 0 === $status['status']) {
69
                    $this->finished($process);
70
                    continue;
71
                }
72
73
                $this->failed($process);
74
            }
75
        });
76
    }
77
78
    /**
79
     * @param TaskInterface $task
80
     * @return TaskInterface
81
     */
82
    protected function finished(TaskInterface $task): TaskInterface
83
    {
84
        unset($this->progressQueue[$task->pid()]);
85
86
        $this->notify();
87
88
        $this->results[] = $task->handle()->output();
89
90
        $this->finishedQueue[$task->pid()] = $task;
91
92
        return $task;
93
    }
94
95
    protected function notify(): void
96
    {
97
        $process = array_shift($this->pendingQueue);
98
99
        if (!$process) {
100
            return;
101
        }
102
103
        $this->progress($process);
104
    }
105
106
    /**
107
     * @param TaskInterface $task
108
     * @return TaskInterface
109
     */
110
    protected function progress(TaskInterface $task): TaskInterface
111
    {
112
        $task->start();
113
114
        unset($this->pendingQueue[$task->id()]);
115
116
        $this->progressQueue[$task->pid()] = $task;
117
118
        return $task;
119
    }
120
121
    /**
122
     * @param TaskInterface $task
123
     * @return TaskInterface
124
     */
125
    protected function failed(TaskInterface $task): TaskInterface
126
    {
127
        unset($this->progressQueue[$task->pid()]);
128
129
        $this->notify();
130
131
        $this->failedQueue[$task->pid()] = $task->error();
132
133
        return $task;
134
    }
135
136
    /**
137
     * @param callable $process
138
     * @return TaskInterface
139
     */
140
    public function add(callable $process): TaskInterface
141
    {
142
        $task = $this->taskFactory->createTask($process, $this->encoder);
143
144
        return $this->pending($task);
145
    }
146
147
    /**
148
     * @param TaskInterface $task
149
     * @return TaskInterface
150
     */
151
    protected function pending(TaskInterface $task): TaskInterface
152
    {
153
        $this->pendingQueue[$task->id()] = $task;
154
155
        $this->notify();
156
157
        return $task;
158
    }
159
160
    /**
161
     * @return array
162
     */
163
    public function wait(): array
164
    {
165
        while (true) {
166
            if (!count($this->progressQueue)) {
167
                break;
168
            }
169
            usleep($this->sleep);
170
        }
171
172
        return $this->results;
173
    }
174
}
175