Total Complexity | 44 |
Total Lines | 480 |
Duplicated Lines | 0 % |
Changes | 0 |
Complex classes like Async often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
While breaking up the class, it is a good idea to analyze how other classes use Async, and based on these observations, apply Extract Interface, too.
1 | <?php |
||
24 | class Async |
||
25 | { |
||
26 | /** |
||
27 | * Job status. |
||
28 | */ |
||
29 | const STATUS_WAITING = 0; |
||
30 | const STATUS_POSTPONED = 1; |
||
31 | const STATUS_PROCESSING = 2; |
||
32 | const STATUS_DONE = 3; |
||
33 | const STATUS_FAILED = 4; |
||
34 | |||
35 | /** |
||
36 | * Job options. |
||
37 | */ |
||
38 | const OPTION_AT = 'at'; |
||
39 | const OPTION_INTERVAL = 'interval'; |
||
40 | const OPTION_RETRY = 'retry'; |
||
41 | const OPTION_RETRY_INTERVAL = 'retry_interval'; |
||
42 | |||
43 | /** |
||
44 | * Database. |
||
45 | * |
||
46 | * @var Database |
||
47 | */ |
||
48 | protected $db; |
||
49 | |||
50 | /** |
||
51 | * LoggerInterface. |
||
52 | * |
||
53 | * @var LoggerInterface |
||
54 | */ |
||
55 | protected $logger; |
||
56 | |||
57 | /** |
||
58 | * Local queue. |
||
59 | * |
||
60 | * @var array |
||
61 | */ |
||
62 | protected $queue = []; |
||
63 | |||
64 | /** |
||
65 | * Node name. |
||
66 | * |
||
67 | * @var string |
||
68 | */ |
||
69 | protected $node_name; |
||
70 | |||
71 | /** |
||
72 | * Collection name. |
||
73 | * |
||
74 | * @var string |
||
75 | */ |
||
76 | protected $collection_name = 'queue'; |
||
77 | |||
78 | /** |
||
79 | * Container. |
||
80 | * |
||
81 | * @var ContainerInterface |
||
82 | */ |
||
83 | protected $container; |
||
84 | |||
85 | /** |
||
86 | * Default at (Secconds from now). |
||
87 | * |
||
88 | * @var int |
||
89 | */ |
||
90 | protected $default_at = 0; |
||
91 | |||
92 | /** |
||
93 | * Default interval (secconds). |
||
94 | * |
||
95 | * @var int |
||
96 | */ |
||
97 | protected $default_interval = -1; |
||
98 | |||
99 | /** |
||
100 | * Default retry. |
||
101 | * |
||
102 | * @var int |
||
103 | */ |
||
104 | protected $default_retry = 0; |
||
105 | |||
106 | /** |
||
107 | * Default retry interval (secconds). |
||
108 | * |
||
109 | * @var int |
||
110 | */ |
||
111 | protected $default_retry_interval = 300; |
||
112 | |||
113 | /** |
||
114 | * Init queue. |
||
115 | * |
||
116 | * @param Database $db |
||
117 | * @param LoggerInterface $logger |
||
118 | * @param ContainerInterface $container |
||
119 | * @param iterable $config |
||
120 | */ |
||
121 | public function __construct(Database $db, LoggerInterface $logger, ?ContainerInterface $container = null, ?Iterable $config = null) |
||
122 | { |
||
123 | $this->db = $db; |
||
124 | $this->logger = $logger; |
||
125 | $this->container = $container; |
||
126 | $this->node_name = gethostname(); |
||
127 | $this->setOptions($config); |
||
128 | } |
||
129 | |||
130 | /** |
||
131 | * Set options. |
||
132 | * |
||
133 | * @param iterable $config |
||
134 | * |
||
135 | * @return Async |
||
136 | */ |
||
137 | public function setOptions(? Iterable $config = null): self |
||
163 | } |
||
164 | |||
165 | /** |
||
166 | * Validate given job options. |
||
167 | * |
||
168 | * @param array $options |
||
169 | * |
||
170 | * @return Async |
||
171 | */ |
||
172 | public function validateOptions(array $options): self |
||
173 | { |
||
174 | foreach ($options as $option => $value) { |
||
175 | switch ($option) { |
||
176 | case self::OPTION_AT: |
||
177 | case self::OPTION_RETRY: |
||
178 | case self::OPTION_RETRY_INTERVAL: |
||
179 | case self::OPTION_INTERVAL: |
||
180 | if (!is_int($value)) { |
||
181 | throw new Exception('option '.$option.' must be an integer'); |
||
182 | } |
||
183 | |||
184 | break; |
||
185 | default: |
||
186 | throw new Exception('invalid option '.$option.' given'); |
||
187 | } |
||
188 | } |
||
189 | |||
190 | return $this; |
||
191 | } |
||
192 | |||
193 | /** |
||
194 | * Add job to queue. |
||
195 | * |
||
196 | * @param string $class |
||
197 | * @param mixed $data |
||
198 | * @param array $options |
||
199 | * |
||
200 | * @return bool |
||
201 | */ |
||
202 | public function addJob(string $class, $data, array $options = []): bool |
||
203 | { |
||
204 | $defaults = [ |
||
205 | self::OPTION_AT => $this->default_at, |
||
206 | self::OPTION_INTERVAL => $this->default_interval, |
||
207 | self::OPTION_RETRY => $this->default_retry, |
||
208 | self::OPTION_RETRY_INTERVAL => $this->default_retry_interval, |
||
209 | ]; |
||
210 | |||
211 | $options = array_merge($defaults, $options); |
||
212 | $this->validateOptions($options); |
||
213 | |||
214 | if ($options[self::OPTION_AT] > 0) { |
||
215 | $at = new UTCDateTime($options[self::OPTION_AT] * 1000); |
||
216 | } else { |
||
217 | $at = null; |
||
218 | } |
||
219 | |||
220 | $result = $this->db->{$this->collection_name}->insertOne([ |
||
221 | 'class' => $class, |
||
222 | 'status' => self::STATUS_WAITING, |
||
223 | 'timestamp' => new UTCDateTime(), |
||
|
|||
224 | 'at' => $at, |
||
225 | 'retry' => $options[self::OPTION_RETRY], |
||
226 | 'retry_interval' => $options[self::OPTION_RETRY_INTERVAL], |
||
227 | 'interval' => $options[self::OPTION_INTERVAL], |
||
228 | 'node' => $this->node_name, |
||
229 | 'data' => $data, |
||
230 | ]); |
||
231 | |||
232 | $this->logger->debug('queue job ['.$result->getInsertedId().'] added to ['.$class.']', [ |
||
233 | 'category' => get_class($this), |
||
234 | 'params' => $options, |
||
235 | 'data' => $data, |
||
236 | ]); |
||
237 | |||
238 | return $result->isAcknowledged(); |
||
239 | } |
||
240 | |||
241 | /** |
||
242 | * Only add job if not in queue yet. |
||
243 | * |
||
244 | * @param string $class |
||
245 | * @param mixed $data |
||
246 | * @param array $options |
||
247 | * |
||
248 | * @return bool |
||
249 | */ |
||
250 | public function addJobOnce(string $class, $data, array $options = []): bool |
||
251 | { |
||
252 | $filter = [ |
||
253 | 'class' => $class, |
||
254 | 'data' => $data, |
||
255 | '$or' => [ |
||
256 | ['status' => self::STATUS_WAITING], |
||
257 | ['status' => self::STATUS_POSTPONED], |
||
258 | ], |
||
259 | ]; |
||
260 | |||
261 | $result = $this->db->queue->findOne($filter); |
||
262 | |||
263 | if (null === $result) { |
||
264 | return $this->addJob($class, $data, $options); |
||
265 | } |
||
266 | $this->logger->debug('queue job ['.$result['_id'].'] of type ['.$class.'] already exists', [ |
||
267 | 'category' => get_class($this), |
||
268 | 'data' => $data, |
||
269 | ]); |
||
270 | |||
271 | return true; |
||
272 | } |
||
273 | |||
274 | /** |
||
275 | * Execute job queue as endless loop. |
||
276 | * |
||
277 | * @return bool |
||
278 | */ |
||
279 | public function startDaemon(): bool |
||
280 | { |
||
281 | $cursor = $this->getCursor(); |
||
282 | |||
283 | while (true) { |
||
284 | $this->processLocalQueue(); |
||
285 | |||
286 | if (null === $cursor->current()) { |
||
287 | if ($cursor->getInnerIterator()->isDead()) { |
||
288 | $this->logger->error('job queue cursor is dead, is it a capped collection?', [ |
||
289 | 'category' => get_class($this), |
||
290 | ]); |
||
291 | |||
292 | return $this->startDaemon(); |
||
293 | } |
||
294 | |||
295 | $cursor->next(); |
||
296 | |||
297 | continue; |
||
298 | } |
||
299 | |||
300 | $job = $cursor->current(); |
||
301 | $cursor->next(); |
||
302 | $this->processJob($job); |
||
303 | } |
||
304 | } |
||
305 | |||
306 | /** |
||
307 | * Execute job queue. |
||
308 | * |
||
309 | * @return bool |
||
310 | */ |
||
311 | public function startOnce(): bool |
||
333 | } |
||
334 | } |
||
335 | |||
336 | /** |
||
337 | * Get cursor. |
||
338 | * |
||
339 | * @param bool $tailable |
||
340 | * |
||
341 | * @return IteratorIterator |
||
342 | */ |
||
343 | protected function getCursor(bool $tailable = true): IteratorIterator |
||
344 | { |
||
345 | $options = []; |
||
346 | if (true === $tailable) { |
||
347 | $options['cursorType'] = Find::TAILABLE; |
||
348 | $options['noCursorTimeout'] = true; |
||
349 | } |
||
350 | |||
351 | $cursor = $this->db->{$this->collection_name}->find([ |
||
352 | '$or' => [ |
||
353 | ['status' => self::STATUS_WAITING], |
||
354 | ['status' => self::STATUS_POSTPONED, |
||
355 | 'node' => $this->node_name, ], |
||
356 | ['status' => self::STATUS_POSTPONED, |
||
357 | 'at' => ['$gte' => new UTCDateTime()], ], |
||
358 | ], |
||
359 | ], $options); |
||
360 | |||
361 | $iterator = new IteratorIterator($cursor); |
||
362 | $iterator->rewind(); |
||
363 | |||
364 | return $iterator; |
||
365 | } |
||
366 | |||
367 | /** |
||
368 | * Update job status. |
||
369 | * |
||
370 | * @param ObjectId $id |
||
371 | * @param int $status |
||
372 | * |
||
373 | * @return bool |
||
374 | */ |
||
375 | protected function updateJob(ObjectId $id, int $status): bool |
||
376 | { |
||
377 | $result = $this->db->{$this->collection_name}->updateMany(['_id' => $id, '$isolated' => true], ['$set' => [ |
||
378 | 'status' => $status, |
||
379 | 'node' => $this->node_name, |
||
380 | 'timestamp' => new UTCDateTime(), |
||
381 | ]]); |
||
382 | |||
383 | $this->logger->debug('job ['.$id.'] updated to status ['.$status.']', [ |
||
384 | 'category' => get_class($this), |
||
385 | ]); |
||
386 | |||
387 | return $result->isAcknowledged(); |
||
388 | } |
||
389 | |||
390 | /** |
||
391 | * Check local queue for postponed jobs. |
||
392 | * |
||
393 | * @return bool |
||
394 | */ |
||
395 | protected function processLocalQueue() |
||
396 | { |
||
397 | $now = new UTCDateTime(); |
||
398 | foreach ($this->queue as $key => $job) { |
||
399 | if ($job['at'] <= $now) { |
||
400 | $this->logger->info('postponed job ['.$job['_id'].'] ['.$job['class'].'] can now be executed', [ |
||
401 | 'category' => get_class($this), |
||
402 | ]); |
||
403 | |||
404 | unset($this->queue[$key]); |
||
405 | $job['at'] = null; |
||
406 | |||
407 | $this->processJob($job); |
||
408 | } |
||
409 | } |
||
410 | |||
411 | return true; |
||
412 | } |
||
413 | |||
414 | /** |
||
415 | * Process job. |
||
416 | * |
||
417 | * @param array $job |
||
418 | * |
||
419 | * @return bool |
||
420 | */ |
||
421 | protected function processJob(array $job): bool |
||
422 | { |
||
423 | if ($job['at'] instanceof UTCDateTime) { |
||
424 | $this->updateJob($job['_id'], self::STATUS_POSTPONED); |
||
425 | $this->queue[] = $job; |
||
426 | |||
427 | $this->logger->debug('execution of job ['.$job['_id'].'] ['.$job['class'].'] is postponed at ['.$job['at'].']', [ |
||
428 | 'category' => get_class($this), |
||
429 | ]); |
||
430 | |||
431 | return true; |
||
432 | } |
||
433 | |||
434 | $this->updateJob($job['_id'], self::STATUS_PROCESSING); |
||
435 | |||
436 | $this->logger->debug('execute job ['.$job['_id'].'] ['.$job['class'].']', [ |
||
437 | 'category' => get_class($this), |
||
438 | 'params' => $job['data'], |
||
439 | ]); |
||
440 | |||
441 | try { |
||
442 | $this->executeJob($job); |
||
443 | } catch (\Exception $e) { |
||
444 | $this->logger->error('failed execute job ['.$job['_id'].']', [ |
||
445 | 'category' => get_class($this), |
||
446 | 'exception' => $e, |
||
447 | ]); |
||
448 | |||
449 | $this->updateJob($job['_id'], self::STATUS_FAILED); |
||
450 | |||
451 | if ($job['retry'] > 0) { |
||
452 | $this->logger->debug('failed job ['.$job['_id'].'] has a retry interval of ['.$job['retry'].']', [ |
||
453 | 'category' => get_class($this), |
||
454 | ]); |
||
455 | |||
456 | $this->addJob($job['class'], $job['data'], [ |
||
457 | self::OPTION_AT => time() + $job['retry_interval'], |
||
458 | self::OPTION_INTERVAL => $job['interval'], |
||
459 | self::OPTION_RETRY => --$job['retry'], |
||
460 | self::OPTION_RETRY_INTERVAL => $job['retry_interval'], |
||
461 | ]); |
||
462 | } |
||
463 | } |
||
464 | |||
465 | if ($job['interval'] >= 0) { |
||
466 | $this->addJob($job['class'], $job['data'], [ |
||
467 | self::OPTION_AT => time() + $job['interval'], |
||
468 | self::OPTION_INTERVAL => $job['interval'], |
||
469 | self::OPTION_RETRY => $job['retry'], |
||
470 | self::OPTION_RETRY_INTERVAL => $job['retry_interval'], |
||
471 | ]); |
||
472 | } |
||
473 | |||
474 | return true; |
||
475 | } |
||
476 | |||
477 | /** |
||
478 | * Execute job. |
||
479 | * |
||
480 | * @param array $job |
||
481 | * |
||
482 | * @return bool |
||
483 | */ |
||
484 | protected function executeJob(array $job): bool |
||
504 | } |
||
505 | } |
||
506 |
This check compares calls to functions or methods with their respective definitions. If the call has less arguments than are defined, it raises an issue.
If a function is defined several times with a different number of parameters, the check may pick up the wrong definition and report false positives. One codebase where this has been known to happen is Wordpress. Please note the @ignore annotation hint above.