GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.
Completed
Pull Request — master (#106)
by
unknown
01:03
created

Pool::putInProgress()   A

Complexity

Conditions 3
Paths 3

Size

Total Lines 16

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
nc 3
nop 1
dl 0
loc 16
rs 9.7333
c 0
b 0
f 0
1
<?php
2
3
namespace Spatie\Async;
4
5
use ArrayAccess;
6
use InvalidArgumentException;
7
use Spatie\Async\Process\ParallelProcess;
8
use Spatie\Async\Process\Runnable;
9
use Spatie\Async\Process\SynchronousProcess;
10
use Spatie\Async\Runtime\ParentRuntime;
11
12
class Pool implements ArrayAccess
13
{
14
    public static $forceSynchronous = false;
15
16
    protected $concurrency = 20;
17
    protected $tasksPerProcess = 1;
18
    protected $timeout = 300;
19
    protected $sleepTime = 50000;
20
21
    /** @var \Spatie\Async\Process\Runnable[] */
22
    protected $queue = [];
23
24
    /** @var \Spatie\Async\Process\Runnable[] */
25
    protected $inProgress = [];
26
27
    /** @var \Spatie\Async\Process\Runnable[] */
28
    protected $finished = [];
29
30
    /** @var \Spatie\Async\Process\Runnable[] */
31
    protected $failed = [];
32
33
    /** @var \Spatie\Async\Process\Runnable[] */
34
    protected $timeouts = [];
35
36
    protected $results = [];
37
38
    protected $status;
39
40
    protected $stopped = false;
41
42
    public function __construct()
43
    {
44
        if (static::isSupported()) {
45
            $this->registerListener();
46
        }
47
48
        $this->status = new PoolStatus($this);
49
    }
50
51
    /**
52
     * @return static
53
     */
54
    public static function create()
55
    {
56
        return new static();
57
    }
58
59
    public static function isSupported(): bool
60
    {
61
        return
62
            function_exists('pcntl_async_signals')
63
            && function_exists('posix_kill')
64
            && ! self::$forceSynchronous;
65
    }
66
67
    public function concurrency(int $concurrency): self
68
    {
69
        $this->concurrency = $concurrency;
70
71
        return $this;
72
    }
73
74
    public function timeout(int $timeout): self
75
    {
76
        $this->timeout = $timeout;
77
78
        return $this;
79
    }
80
81
    public function autoload(string $autoloader): self
82
    {
83
        ParentRuntime::init($autoloader);
84
85
        return $this;
86
    }
87
88
    public function sleepTime(int $sleepTime): self
89
    {
90
        $this->sleepTime = $sleepTime;
91
92
        return $this;
93
    }
94
95
    public function notify()
96
    {
97
        if (count($this->inProgress) >= $this->concurrency) {
98
            return;
99
        }
100
101
        $process = array_shift($this->queue);
102
103
        if (! $process) {
104
            return;
105
        }
106
107
        $this->putInProgress($process);
108
    }
109
110
    /**
111
     * @param \Spatie\Async\Process\Runnable|callable $process
112
     * @param int|null $outputLength
113
     *
114
     * @return \Spatie\Async\Process\Runnable
115
     */
116
    public function add($process, ?int $outputLength = null): Runnable
117
    {
118
        if (! is_callable($process) && ! $process instanceof Runnable) {
119
            throw new InvalidArgumentException('The process passed to Pool::add should be callable.');
120
        }
121
122
        if (! $process instanceof Runnable) {
123
            $process = ParentRuntime::createProcess($process, $outputLength);
124
        }
125
126
        $this->putInQueue($process);
127
128
        return $process;
129
    }
130
131
    public function wait(?callable $intermediateCallback = null): array
132
    {
133
        while ($this->inProgress) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $this->inProgress of type Spatie\Async\Process\Runnable[] is implicitly converted to a boolean; are you sure this is intended? If so, consider using ! empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
134
            foreach ($this->inProgress as $process) {
135
                if ($process->getCurrentExecutionTime() > $this->timeout) {
136
                    $this->markAsTimedOut($process);
137
                }
138
139
                if ($process instanceof SynchronousProcess) {
140
                    $this->markAsFinished($process);
141
                }
142
            }
143
144
            if (! $this->inProgress) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $this->inProgress of type Spatie\Async\Process\Runnable[] is implicitly converted to a boolean; are you sure this is intended? If so, consider using empty($expr) instead to make it clear that you intend to check for an array without elements.

This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.

Consider making the comparison explicit by using empty(..) or ! empty(...) instead.

Loading history...
145
                break;
146
            }
147
148
            if ($intermediateCallback) {
149
                call_user_func_array($intermediateCallback, [$this]);
150
            }
151
152
            usleep($this->sleepTime);
153
        }
154
155
        return $this->results;
156
    }
157
158
    public function putInQueue(Runnable $process)
159
    {
160
        $this->queue[$process->getId()] = $process;
161
162
        $this->notify();
163
    }
164
165
    public function putInProgress(Runnable $process)
166
    {
167
        if ($this->stopped) {
168
            return;
169
        }
170
171
        if ($process instanceof ParallelProcess) {
172
            $process->getProcess()->setTimeout($this->timeout);
173
        }
174
175
        $process->start();
176
177
        unset($this->queue[$process->getId()]);
178
179
        $this->inProgress[$process->getPid()] = $process;
180
    }
181
182
    public function markAsFinished(Runnable $process)
183
    {
184
        unset($this->inProgress[$process->getPid()]);
185
186
        $this->notify();
187
188
        $this->results[] = $process->triggerSuccess();
189
190
        $this->finished[$process->getPid()] = $process;
191
    }
192
193
    public function markAsTimedOut(Runnable $process)
194
    {
195
        unset($this->inProgress[$process->getPid()]);
196
197
        $this->notify();
198
199
        $process->triggerTimeout();
200
201
        $this->timeouts[$process->getPid()] = $process;
202
    }
203
204
    public function markAsFailed(Runnable $process)
205
    {
206
        unset($this->inProgress[$process->getPid()]);
207
208
        $this->notify();
209
210
        $process->triggerError();
211
212
        $this->failed[$process->getPid()] = $process;
213
    }
214
215
    public function offsetExists($offset)
216
    {
217
        // TODO
218
219
        return false;
220
    }
221
222
    public function offsetGet($offset)
223
    {
224
        // TODO
225
    }
226
227
    public function offsetSet($offset, $value)
228
    {
229
        $this->add($value);
230
    }
231
232
    public function offsetUnset($offset)
233
    {
234
        // TODO
235
    }
236
237
    /**
238
     * @return \Spatie\Async\Process\Runnable[]
239
     */
240
    public function getQueue(): array
241
    {
242
        return $this->queue;
243
    }
244
245
    /**
246
     * @return \Spatie\Async\Process\Runnable[]
247
     */
248
    public function getInProgress(): array
249
    {
250
        return $this->inProgress;
251
    }
252
253
    /**
254
     * @return \Spatie\Async\Process\Runnable[]
255
     */
256
    public function getFinished(): array
257
    {
258
        return $this->finished;
259
    }
260
261
    /**
262
     * @return \Spatie\Async\Process\Runnable[]
263
     */
264
    public function getFailed(): array
265
    {
266
        return $this->failed;
267
    }
268
269
    /**
270
     * @return \Spatie\Async\Process\Runnable[]
271
     */
272
    public function getTimeouts(): array
273
    {
274
        return $this->timeouts;
275
    }
276
277
    public function status(): PoolStatus
278
    {
279
        return $this->status;
280
    }
281
282
    protected function registerListener()
283
    {
284
        pcntl_async_signals(true);
285
286
        pcntl_signal(SIGCHLD, function ($signo, $status) {
287
            while (true) {
288
                $pid = pcntl_waitpid(-1, $processState, WNOHANG | WUNTRACED);
289
290
                if ($pid <= 0) {
291
                    break;
292
                }
293
294
                $process = $this->inProgress[$pid] ?? null;
295
296
                if (! $process) {
297
                    continue;
298
                }
299
300
                if ($status['status'] === 0) {
301
                    $this->markAsFinished($process);
302
303
                    continue;
304
                }
305
306
                $this->markAsFailed($process);
307
            }
308
        });
309
    }
310
311
    public function stop()
312
    {
313
        $this->stopped = true;
314
    }
315
}
316