Complex classes like Pool often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes. You can also have a look at the cohesion graph to spot any un-connected, or weakly-connected components.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
While breaking up the class, it is a good idea to analyze how other classes use Pool, and based on these observations, apply Extract Interface, too.
1 | <?php |
||
12 | class Pool implements ArrayAccess |
||
13 | { |
||
14 | public static $forceSynchronous = false; |
||
15 | |||
16 | protected $concurrency = 20; |
||
17 | protected $tasksPerProcess = 1; |
||
18 | protected $timeout = 300; |
||
19 | protected $sleepTime = 50000; |
||
20 | |||
21 | /** @var \Spatie\Async\Process\Runnable[] */ |
||
22 | protected $queue = []; |
||
23 | |||
24 | /** @var \Spatie\Async\Process\Runnable[] */ |
||
25 | protected $inProgress = []; |
||
26 | |||
27 | /** @var \Spatie\Async\Process\Runnable[] */ |
||
28 | protected $finished = []; |
||
29 | |||
30 | /** @var \Spatie\Async\Process\Runnable[] */ |
||
31 | protected $failed = []; |
||
32 | |||
33 | /** @var \Spatie\Async\Process\Runnable[] */ |
||
34 | protected $timeouts = []; |
||
35 | |||
36 | protected $results = []; |
||
37 | |||
38 | protected $status; |
||
39 | |||
40 | protected $stopped = false; |
||
41 | |||
42 | public function __construct() |
||
43 | { |
||
44 | if (static::isSupported()) { |
||
45 | $this->registerListener(); |
||
46 | } |
||
47 | |||
48 | $this->status = new PoolStatus($this); |
||
49 | } |
||
50 | |||
51 | /** |
||
52 | * @return static |
||
53 | */ |
||
54 | public static function create() |
||
55 | { |
||
56 | return new static(); |
||
57 | } |
||
58 | |||
59 | public static function isSupported(): bool |
||
60 | { |
||
61 | return |
||
62 | function_exists('pcntl_async_signals') |
||
63 | && function_exists('posix_kill') |
||
64 | && ! self::$forceSynchronous; |
||
65 | } |
||
66 | |||
67 | public function concurrency(int $concurrency): self |
||
68 | { |
||
69 | $this->concurrency = $concurrency; |
||
70 | |||
71 | return $this; |
||
72 | } |
||
73 | |||
74 | public function timeout(int $timeout): self |
||
75 | { |
||
76 | $this->timeout = $timeout; |
||
77 | |||
78 | return $this; |
||
79 | } |
||
80 | |||
81 | public function autoload(string $autoloader): self |
||
82 | { |
||
83 | ParentRuntime::init($autoloader); |
||
84 | |||
85 | return $this; |
||
86 | } |
||
87 | |||
88 | public function sleepTime(int $sleepTime): self |
||
89 | { |
||
90 | $this->sleepTime = $sleepTime; |
||
91 | |||
92 | return $this; |
||
93 | } |
||
94 | |||
95 | public function notify() |
||
96 | { |
||
97 | if (count($this->inProgress) >= $this->concurrency) { |
||
98 | return; |
||
99 | } |
||
100 | |||
101 | $process = array_shift($this->queue); |
||
102 | |||
103 | if (! $process) { |
||
104 | return; |
||
105 | } |
||
106 | |||
107 | $this->putInProgress($process); |
||
108 | } |
||
109 | |||
110 | /** |
||
111 | * @param \Spatie\Async\Process\Runnable|callable $process |
||
112 | * @param int|null $outputLength |
||
113 | * |
||
114 | * @return \Spatie\Async\Process\Runnable |
||
115 | */ |
||
116 | public function add($process, ?int $outputLength = null): Runnable |
||
117 | { |
||
118 | if (! is_callable($process) && ! $process instanceof Runnable) { |
||
119 | throw new InvalidArgumentException('The process passed to Pool::add should be callable.'); |
||
120 | } |
||
121 | |||
122 | if (! $process instanceof Runnable) { |
||
123 | $process = ParentRuntime::createProcess($process, $outputLength); |
||
124 | } |
||
125 | |||
126 | $this->putInQueue($process); |
||
127 | |||
128 | return $process; |
||
129 | } |
||
130 | |||
131 | public function wait(?callable $intermediateCallback = null): array |
||
132 | { |
||
133 | while ($this->inProgress) { |
||
|
|||
134 | foreach ($this->inProgress as $process) { |
||
135 | if ($process->getCurrentExecutionTime() > $this->timeout) { |
||
136 | $this->markAsTimedOut($process); |
||
137 | } |
||
138 | |||
139 | if ($process instanceof SynchronousProcess) { |
||
140 | $this->markAsFinished($process); |
||
141 | } |
||
142 | } |
||
143 | |||
144 | if (! $this->inProgress) { |
||
145 | break; |
||
146 | } |
||
147 | |||
148 | if ($intermediateCallback) { |
||
149 | call_user_func_array($intermediateCallback, [$this]); |
||
150 | } |
||
151 | |||
152 | usleep($this->sleepTime); |
||
153 | } |
||
154 | |||
155 | return $this->results; |
||
156 | } |
||
157 | |||
158 | public function putInQueue(Runnable $process) |
||
159 | { |
||
160 | $this->queue[$process->getId()] = $process; |
||
161 | |||
162 | $this->notify(); |
||
163 | } |
||
164 | |||
165 | public function putInProgress(Runnable $process) |
||
166 | { |
||
167 | if ($this->stopped) { |
||
168 | return; |
||
169 | } |
||
170 | |||
171 | if ($process instanceof ParallelProcess) { |
||
172 | $process->getProcess()->setTimeout($this->timeout); |
||
173 | } |
||
174 | |||
175 | $process->start(); |
||
176 | |||
177 | unset($this->queue[$process->getId()]); |
||
178 | |||
179 | $this->inProgress[$process->getPid()] = $process; |
||
180 | } |
||
181 | |||
182 | public function markAsFinished(Runnable $process) |
||
192 | |||
193 | public function markAsTimedOut(Runnable $process) |
||
203 | |||
204 | public function markAsFailed(Runnable $process) |
||
214 | |||
215 | public function offsetExists($offset) |
||
221 | |||
222 | public function offsetGet($offset) |
||
226 | |||
227 | public function offsetSet($offset, $value) |
||
231 | |||
232 | public function offsetUnset($offset) |
||
236 | |||
237 | /** |
||
238 | * @return \Spatie\Async\Process\Runnable[] |
||
239 | */ |
||
240 | public function getQueue(): array |
||
244 | |||
245 | /** |
||
246 | * @return \Spatie\Async\Process\Runnable[] |
||
247 | */ |
||
248 | public function getInProgress(): array |
||
252 | |||
253 | /** |
||
254 | * @return \Spatie\Async\Process\Runnable[] |
||
255 | */ |
||
256 | public function getFinished(): array |
||
260 | |||
261 | /** |
||
262 | * @return \Spatie\Async\Process\Runnable[] |
||
263 | */ |
||
264 | public function getFailed(): array |
||
268 | |||
269 | /** |
||
270 | * @return \Spatie\Async\Process\Runnable[] |
||
271 | */ |
||
272 | public function getTimeouts(): array |
||
276 | |||
277 | public function status(): PoolStatus |
||
281 | |||
282 | protected function registerListener() |
||
310 | |||
311 | public function stop() |
||
312 | { |
||
313 | $this->stopped = true; |
||
314 | } |
||
315 | } |
||
316 |
This check marks implicit conversions of arrays to boolean values in a comparison. While in PHP an empty array is considered to be equal (but not identical) to false, this is not always apparent.
Consider making the comparison explicit by using
empty(..)
or! empty(...)
instead.