ProcessManager   F
last analyzed

Complexity

Total Complexity 65

Size/Duplication

Total Lines 550
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 8

Importance

Changes 0
Metric Value
dl 0
loc 550
c 0
b 0
f 0
wmc 65
lcom 1
cbo 8
rs 3.2

28 Methods

Rating   Name   Duplication   Size   Complexity  
A addTask() 0 6 1
C tick() 0 54 11
A stopSiblingTasks() 0 14 4
A canRunTask() 0 22 4
A getProfileForProcesses() 0 22 1
A getStatsForProcesses() 0 32 5
A getShell() 0 8 2
A setShell() 0 6 1
A newShell() 0 4 1
A newProfile() 0 4 1
A getRules() 0 8 2
A setRules() 0 6 1
A newRules() 0 4 1
A setBinary() 0 6 1
A getBinary() 0 8 2
A setWorker() 0 6 1
A getWorker() 0 8 2
A getStdOut() 0 8 2
A getLogPath() 0 4 1
A setLogPath() 0 6 1
A getStdErr() 0 8 2
A getTaskString() 0 4 1
B canRemoveTask() 0 30 7
A isTaskExpired() 0 12 4
A isTaskCancelled() 0 8 2
A killTask() 0 12 2
A addRule() 0 6 1
A removeRule() 0 6 1

How to fix   Complexity   

Complex Class

Complex classes like ProcessManager often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes. You can also have a look at the cohesion graph to spot any un-connected, or weakly-connected components.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use ProcessManager, and based on these observations, apply Extract Interface, too.

1
<?php
2
3
namespace AsyncPHP\Doorman\Manager;
4
5
use AsyncPHP\Doorman\Cancellable;
6
use AsyncPHP\Doorman\Expires;
7
use AsyncPHP\Doorman\Manager;
8
use AsyncPHP\Doorman\Process;
9
use AsyncPHP\Doorman\Profile;
10
use AsyncPHP\Doorman\Profile\InMemoryProfile;
11
use AsyncPHP\Doorman\Rule;
12
use AsyncPHP\Doorman\Rules;
13
use AsyncPHP\Doorman\Rules\InMemoryRules;
14
use AsyncPHP\Doorman\Shell;
15
use AsyncPHP\Doorman\Shell\BashShell;
16
use AsyncPHP\Doorman\Task;
17
use SplObjectStorage;
18
19
class ProcessManager implements Manager
20
{
21
    /**
22
     * @var Task[]
23
     */
24
    protected $waiting = array();
25
26
    /**
27
     * @var Task[]
28
     */
29
    protected $running = array();
30
31
    /**
32
     * @var null|SplObjectStorage
33
     */
34
    protected $timings = null;
35
36
    /**
37
     * @var null|string
38
     */
39
    protected $logPath;
40
41
    /**
42
     * @var null|Rules
43
     */
44
    protected $rules;
45
46
    /**
47
     * @var null|Shell
48
     */
49
    protected $shell;
50
51
    /**
52
     * @var null|string
53
     */
54
    protected $binary;
55
56
    /**
57
     * @var null|string
58
     */
59
    protected $worker;
60
61
    /**
62
     * @inheritdoc
63
     *
64
     * @param Task $task
65
     *
66
     * @return $this
67
     */
68
    public function addTask(Task $task)
69
    {
70
        $this->waiting[] = $task;
71
72
        return $this;
73
    }
74
75
    /**
76
     * @inheritdoc
77
     *
78
     * @return bool
79
     */
80
    public function tick()
81
    {
82
        if (!$this->timings instanceof SplObjectStorage) {
83
            $this->timings = new SplObjectStorage();
84
        }
85
86
        $waiting = array();
87
        $running = array();
88
89
        foreach ($this->waiting as $task) {
90
            if ($this->isTaskCancelled($task)) {
91
                continue;
92
            }
93
            
94
            if (!$this->canRunTask($task)) {
95
                $waiting[] = $task;
96
                continue;
97
            }
98
99
            if ($task->stopsSiblings()) {
100
                $this->stopSiblingTasks($task);
101
            }
102
103
            $binary = $this->getBinary();
104
            $worker = $this->getWorker();
105
            $stdout = $this->getStdOut();
106
            $stderr = $this->getStdErr();
107
108
            if ($task instanceof Expires) {
109
                $this->timings[$task] = time();
110
            }
111
112
            $output = $this->getShell()->exec("{$binary} {$worker} %s {$stdout} {$stderr} & echo $!", array(
113
                $this->getTaskString($task),
114
            ));
115
116
            if ($task instanceof Process) {
117
                $task->setId($output[0]);
118
            }
119
120
            $this->running[] = $task;
121
        }
122
123
        foreach ($this->running as $task) {
124
            if (!$this->canRemoveTask($task)) {
125
                $running[] = $task;
126
            }
127
        }
128
129
        $this->waiting = $waiting;
130
        $this->running = $running;
131
132
        return !empty($waiting) || !empty($running);
133
    }
134
135
    /**
136
     * Stops sibling processes of a task.
137
     *
138
     * @param Task $task
139
     *
140
     * @return $this
141
     */
142
    protected function stopSiblingTasks(Task $task)
143
    {
144
        $handler = $task->getHandler();
145
146
        foreach ($this->running as $task) {
147
            if ($task->getHandler() === $handler && $task instanceof Process) {
148
                $this->getShell()->exec("kill -9 %s", array(
149
                    $task->getId(),
150
                ));
151
            }
152
        }
153
154
        return $this;
155
    }
156
157
    /**
158
     * Checks whether a new task can be run.
159
     *
160
     * @param Task $task
161
     *
162
     * @return bool
163
     */
164
    protected function canRunTask(Task $task)
165
    {
166
        if(!$task->canRunTask()) {
167
            return false;
168
        }
169
170
        if ($task->ignoresRules()) {
171
            return true;
172
        }
173
174
        $processes = array_filter($this->running, function (Task $task) {
175
            return $task instanceof Process;
176
        });
177
178
        if (count($processes) < 1) {
179
            return true;
180
        }
181
182
        $profile = $this->getProfileForProcesses($task, $processes);
183
184
        return $this->getRules()->canRunTask($task, $profile);
185
    }
186
187
    /**
188
     * Gets the load profile related to a task.
189
     *
190
     * @param Task  $task
191
     * @param array $processes
192
     *
193
     * @return Profile
194
     */
195
    protected function getProfileForProcesses(Task $task, array $processes)
196
    {
197
        $stats = $this->getStatsForProcesses($processes);
198
199
        $siblingProcesses = array_filter($processes, function (Task $next) use ($task) {
200
            return $next->getHandler() === $task->getHandler();
201
        });
202
203
        $siblingStats = $this->getStatsForProcesses($siblingProcesses);
204
205
        $profile = $this->newProfile();
206
207
        $profile->setProcesses($processes);
208
        $profile->setProcessorLoad(min(100, array_sum(array_column($stats, 1))));
209
        $profile->setMemoryLoad(min(100, array_sum(array_column($stats, 2))));
210
211
        $profile->setSiblingProcesses($siblingProcesses);
212
        $profile->setSiblingProcessorLoad(min(100, array_sum(array_column($siblingStats, 1))));
213
        $profile->setSiblingMemoryLoad(min(100, array_sum(array_column($siblingStats, 2))));
214
215
        return $profile;
216
    }
217
218
    /**
219
     * Gets processor and memory stats for a list of processes.
220
     *
221
     * @param Process[] $processes
222
     *
223
     * @return array
224
     */
225
    protected function getStatsForProcesses(array $processes)
226
    {
227
        $stats = array();
228
229
        foreach ($processes as $process) {
230
            $output = $this->getShell()->exec("ps -o pid,%%cpu,%%mem,state,start -p %s | sed 1d", array(
231
                $process->getId(),
232
            ));
233
234
            if (count($output) < 1) {
235
                continue;
236
            }
237
238
            $last = $output[count($output) - 1];
239
240
            if (trim($last) === "") {
241
                continue;
242
            }
243
244
            $parts = preg_split("/\s+/", trim($last));
245
246
            $pid = intval($parts[0]);
247
248
            if ("{$pid}" !== $parts[0]) {
249
                continue;
250
            }
251
252
            $stats[] = $parts;
253
        }
254
255
        return $stats;
256
    }
257
258
    /**
259
     * Gets or creates a Shell instance.
260
     *
261
     * @return Shell
262
     */
263
    public function getShell()
264
    {
265
        if ($this->shell === null) {
266
            $this->shell = $this->newShell();
267
        }
268
269
        return $this->shell;
270
    }
271
272
    /**
273
     * @param Shell $shell
274
     *
275
     * @return $this
276
     */
277
    public function setShell(Shell $shell)
278
    {
279
        $this->shell = $shell;
280
281
        return $this;
282
    }
283
284
    /**
285
     * Creates a new Shell instance.
286
     *
287
     * @return Shell
288
     */
289
    protected function newShell()
290
    {
291
        return new BashShell();
292
    }
293
294
    /**
295
     * Creates a new Profile instance.
296
     *
297
     * @return Profile
298
     */
299
    protected function newProfile()
300
    {
301
        return new InMemoryProfile();
302
    }
303
304
    /**
305
     * Gets or creates a new Rules instance.
306
     *
307
     * @return Rules
308
     */
309
    public function getRules()
310
    {
311
        if ($this->rules === null) {
312
            $this->rules = $this->newRules();
313
        }
314
315
        return $this->rules;
316
    }
317
318
    /**
319
     * @param Rules $rules
320
     *
321
     * @return $this
322
     */
323
    public function setRules(Rules $rules)
324
    {
325
        $this->rules = $rules;
326
327
        return $this;
328
    }
329
330
    /**
331
     * Creates a new Rules instance.
332
     *
333
     * @return Rules
334
     */
335
    protected function newRules()
336
    {
337
        return new InMemoryRules();
338
    }
339
340
    /**
341
     * @param string $binary
342
     *
343
     * @return $this
344
     */
345
    public function setBinary($binary)
346
    {
347
        $this->binary = $binary;
348
349
        return $this;
350
    }
351
352
    /**
353
     * Gets the path of the PHP runtime.
354
     *
355
     * @return string
356
     */
357
    public function getBinary()
358
    {
359
        if ($this->binary === null) {
360
            $this->binary = PHP_BINDIR."/php";
361
        }
362
363
        return $this->binary;
364
    }
365
366
    /**
367
     * @param string $worker
368
     *
369
     * @return $this
370
     */
371
    public function setWorker($worker)
372
    {
373
        $this->worker = $worker;
374
375
        return $this;
376
    }
377
378
    /**
379
     * Gets the path of the worker script.
380
     *
381
     * @return string
382
     */
383
    public function getWorker()
384
    {
385
        if ($this->worker === null) {
386
            $this->worker = realpath(__DIR__."/../../bin/worker.php");
387
        }
388
389
        return $this->worker;
390
    }
391
392
    /**
393
     * Gets the path to write stdout to.
394
     *
395
     * @return string
396
     */
397
    protected function getStdOut()
398
    {
399
        if ($this->getLogPath() !== null) {
400
            return ">> ".$this->getLogPath()."/stdout.log";
401
        }
402
403
        return "> /dev/null";
404
    }
405
406
    /**
407
     * @return null|string
408
     */
409
    public function getLogPath()
410
    {
411
        return $this->logPath;
412
    }
413
414
    /**
415
     * @param string $logPath
416
     *
417
     * @return $this
418
     */
419
    public function setLogPath($logPath)
420
    {
421
        $this->logPath = $logPath;
422
423
        return $this;
424
    }
425
426
    /**
427
     * Gets the path to write stderr to.
428
     *
429
     * @return string
430
     */
431
    protected function getStdErr()
432
    {
433
        if ($this->getLogPath() !== null) {
434
            return "2>> ".$this->getLogPath()."/stderr.log";
435
        }
436
437
        return "2> /dev/null";
438
    }
439
440
    /**
441
     * Gets a string representation of a task, to pass to the worker script.
442
     *
443
     * @param Task $task
444
     *
445
     * @return string
446
     */
447
    protected function getTaskString(Task $task)
448
    {
449
        return base64_encode(serialize($task));
450
    }
451
452
    /**
453
     * Checks whether a task can be removed from the list of running processes.
454
     *
455
     * @param Task $task
456
     *
457
     * @return bool
458
     */
459
    protected function canRemoveTask(Task $task)
460
    {
461
        if (!$task instanceof Process) {
462
            return true;
463
        }
464
        
465
        if ($this->isTaskExpired($task) || $this->isTaskCancelled($task)) {
466
            $this->killTask($task);
467
            return true;
468
        }
469
470
        $processes = array_filter($this->running, function (Task $task) {
471
            return $task instanceof Process;
472
        });
473
474
        if (count($processes) < 1) {
475
            return true;
476
        }
477
478
        $found = false;
479
        $stats = $this->getStatsForProcesses($processes);
480
481
        foreach ($stats as $stat) {
482
            if ($stat[0] === $task->getId()) {
483
                $found = true;
484
            }
485
        }
486
487
        return !$found;
488
    }
489
490
    /**
491
     * Check if the given task is expired
492
     *
493
     * @param Task $task
494
     * @return boolean
495
     */
496
    protected function isTaskExpired(Task $task) {
497
        if ($task instanceof Expires) {
498
            $expiresIn = $task->getExpiresIn();
499
            $startedAt = $this->timings[$task];
500
501
            if($expiresIn > 0 && (time() - $startedAt) >= $expiresIn) {
502
                return $task->shouldExpire($startedAt);
503
            }
504
        }
505
506
        return false;
507
    }
508
509
    /**
510
     * Check if the given task is cancelled.
511
     * 
512
     * @param Task $task
513
     *
514
     * @return bool
515
     */
516
    protected function isTaskCancelled(Task $task)
517
    {
518
        if ($task instanceof Cancellable) {
519
            return $task->isCancelled();
520
        }
521
522
        return false;
523
    }
524
525
    /**
526
     * Revoke any background processes attached to this task.
527
     *
528
     * @param Task $task
529
     *
530
     * @return bool
531
     */
532
    protected function killTask(Task $task)
533
    {
534
        if ($task instanceof Process) {
535
            $this->getShell()->exec("kill -9 %s", array(
536
                $task->getId(),
537
            ));
538
539
            return true;
540
        }
541
542
        return false;
543
    }
544
545
    /**
546
     * @param Rule $rule
547
     *
548
     * @return $this
549
     */
550
    public function addRule(Rule $rule)
551
    {
552
        $this->getRules()->addRule($rule);
553
554
        return $this;
555
    }
556
557
    /**
558
     * @param Rule $rule
559
     *
560
     * @return $this
561
     */
562
    public function removeRule(Rule $rule)
563
    {
564
        $this->getRules()->removeRule($rule);
565
566
        return $this;
567
    }
568
}
569