GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Push — master ( 2809db...84b5a3 )
by Brent
01:29
created

Pool::run()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nc 1
nop 1
dl 0
loc 4
rs 10
c 0
b 0
f 0
1
<?php
2
3
namespace Spatie\Async;
4
5
use Exception;
6
7
class Pool implements \ArrayAccess
8
{
9
    protected $runtime;
10
    protected $concurrency = 20;
11
    protected $tasksPerProcess = 1;
12
13
    /** @var \Spatie\Async\Task[] */
14
    protected $tasks = [];
15
16
    /** @var \Spatie\Async\Process[] */
17
    protected $queue = [];
18
    /** @var \Spatie\Async\Process[] */
19
    protected $inProgress = [];
20
    /** @var \Spatie\Async\Process[] */
21
    protected $finished = [];
22
    /** @var \Spatie\Async\Process[] */
23
    protected $failed = [];
24
25
    public function __construct()
26
    {
27
        $this->runtime = new Runtime();
28
    }
29
30
    /**
31
     * @return static
32
     */
33
    public static function create()
34
    {
35
        return new static();
36
    }
37
38
    public function concurrency(int $concurrency): self
39
    {
40
        $this->concurrency = $concurrency;
41
42
        return $this;
43
    }
44
45
    public function tasksPerProcess(int $tasksPerProcess): self
46
    {
47
        $this->tasksPerProcess = $tasksPerProcess;
48
49
        return $this;
50
    }
51
52
    public function maximumExecutionTime(int $maximumExecutionTime): self
53
    {
54
        $this->runtime->maximumExecutionTime($maximumExecutionTime);
55
56
        return $this;
57
    }
58
59
    public function notify(): void
60
    {
61
        if (count($this->tasks) >= $this->tasksPerProcess) {
62
            $this->scheduleTasks($this->tasksPerProcess);
63
        }
64
65
        if (count($this->inProgress) >= $this->concurrency) {
66
            return;
67
        }
68
69
        $process = array_shift($this->queue);
70
71
        if (! $process) {
72
            return;
73
        }
74
75
        $process = $this->run($process);
76
77
        $this->inProgress($process);
78
    }
79
80
    public function add($process): ?Process
81
    {
82
        if ($process instanceof Task) {
83
            $this->queueTask($process);
84
85
            return null;
86
        }
87
88
        if (! $process instanceof Process) {
89
            $process = new CallableProcess($process);
90
        }
91
92
        $process->setInternalId(uniqid(getmypid()));
93
94
        $this->queue($process);
95
96
        return $process;
97
    }
98
99
    public function run(Process $process): Process
100
    {
101
        return $this->runtime->start($process);
102
    }
103
104
    public function wait(): void
105
    {
106
        $this->scheduleTasks();
107
108
        while (count($this->inProgress)) {
109
            foreach ($this->inProgress as $process) {
110
                $processStatus = pcntl_waitpid($process->pid(), $status, WNOHANG | WUNTRACED);
111
112
                if ($processStatus == $process->pid()) {
113
                    $isSuccess = $this->runtime->handleFinishedProcess($process);
114
115
                    if ($isSuccess) {
116
                        $this->finished($process);
117
                    } else {
118
                        $this->failed($process);
119
                    }
120
                } elseif ($processStatus == 0) {
121
                    $isRunning = $this->runtime->handleRunningProcess($process, $status);
122
123
                    if (!$isRunning) {
124
                        $this->failed($process);
125
                    }
126
                } else {
127
                    throw new Exception("Could not reliably manage process {$process->pid()}");
128
                }
129
            }
130
131
            if (! count($this->inProgress)) {
132
                break;
133
            }
134
135
            usleep(100000);
136
        }
137
    }
138
139
    public function queueTask(Task $task): void
140
    {
141
        $this->tasks[] = $task;
142
143
        $this->notify();
144
    }
145
146
    public function queue(Process $process): void
147
    {
148
        $this->queue[$process->internalId()] = $process;
149
150
        $this->notify();
151
    }
152
153
    public function inProgress(Process $process): void
154
    {
155
        unset($this->queue[$process->internalId()]);
156
157
        $this->inProgress[$process->pid()] = $process;
158
    }
159
160
    public function finished(Process $process): void
161
    {
162
        unset($this->inProgress[$process->pid()]);
163
164
        $this->finished[$process->pid()] = $process;
165
166
        $this->notify();
167
    }
168
169
    public function failed(Process $process): void
170
    {
171
        unset($this->inProgress[$process->pid()]);
172
173
        $this->failed[$process->pid()] = $process;
174
175
        $this->notify();
176
    }
177
178
    public function offsetExists($offset)
179
    {
180
        // TODO
181
182
        return false;
183
    }
184
185
    public function offsetGet($offset)
186
    {
187
        // TODO
188
    }
189
190
    public function offsetSet($offset, $value)
191
    {
192
        $this->add($value);
193
    }
194
195
    public function offsetUnset($offset)
196
    {
197
        // TODO
198
    }
199
200
    protected function scheduleTasks(?int $amount = null): void
201
    {
202
        $amount = $amount ?? count($this->tasks);
203
204
        $tasksToRun = array_splice($this->tasks, 0, $amount);
205
206
        if (! count($tasksToRun)) {
207
            return;
208
        }
209
210
        $this->add(new CallableProcess(function () use ($tasksToRun) {
211
            /** @var \Spatie\Async\Task $task */
212
            foreach ($tasksToRun as $task) {
213
                $task->execute();
214
            }
215
        }));
216
    }
217
218
    /**
219
     * @return \Spatie\Async\Process[]
220
     */
221
    public function getFinished(): array
222
    {
223
        return $this->finished;
224
    }
225
226
    /**
227
     * @return \Spatie\Async\Process[]
228
     */
229
    public function getFailed(): array
230
    {
231
        return $this->failed;
232
    }
233
}
234