Completed
Push — master ( 822943...dd24c4 )
by Christopher
03:54 queued 01:47
created

ProcessManager::tick()   C

Complexity

Conditions 11
Paths 132

Size

Total Lines 54
Code Lines 30

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 35
CRAP Score 11.0594

Importance

Changes 7
Bugs 0 Features 1
Metric Value
c 7
b 0
f 1
dl 0
loc 54
ccs 35
cts 38
cp 0.9211
rs 6.0909
cc 11
eloc 30
nc 132
nop 0
crap 11.0594

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
namespace AsyncPHP\Doorman\Manager;
4
5
use AsyncPHP\Doorman\Cancellable;
6
use AsyncPHP\Doorman\Expires;
7
use AsyncPHP\Doorman\Manager;
8
use AsyncPHP\Doorman\Process;
9
use AsyncPHP\Doorman\Profile;
10
use AsyncPHP\Doorman\Profile\InMemoryProfile;
11
use AsyncPHP\Doorman\Rule;
12
use AsyncPHP\Doorman\Rules;
13
use AsyncPHP\Doorman\Rules\InMemoryRules;
14
use AsyncPHP\Doorman\Shell;
15
use AsyncPHP\Doorman\Shell\BashShell;
16
use AsyncPHP\Doorman\Task;
17
use SplObjectStorage;
18
19
final class ProcessManager implements Manager
20
{
21
    /**
22
     * @var Task[]
23
     */
24
    private $waiting = [];
25
26
    /**
27
     * @var Task[]
28
     */
29
    private $running = [];
30
31
    /**
32
     * @var null|SplObjectStorage
33
     */
34
    private $timings = null;
35
36
    /**
37
     * @var null|string
38
     */
39
    private $logPath;
40
41
    /**
42
     * @var null|Rules
43
     */
44
    private $rules;
45
46
    /**
47
     * @var null|Shell
48
     */
49
    protected $shell;
50
51
    /**
52
     * @var null|string
53
     */
54
    private $binary;
55
56
    /**
57
     * @var null|string
58
     */
59
    private $worker;
60
    
61
    /**
62
     * Get a list of the waiting tasks.
63
     * 
64
     * @return array
65
     */
66
    public function getWaiting()
67
    {
68
        return $this->waiting;
69
    }
70
    
71
    /**
72
     * Get a list of the running tasks.
73
     * 
74
     * @return array
75
     */
76
    public function getRunning()
77
    {
78
        return $this->running;
79
    }
80
81
    /**
82
     * @inheritdoc
83
     *
84
     * @param Task $task
85
     *
86
     * @return $this
87
     */
88 1
    public function addTask(Task $task)
89
    {
90 1
        $this->waiting[] = $task;
91
92 1
        return $this;
93
    }
94
95
    /**
96
     * @inheritdoc
97
     *
98
     * @return bool
99
     */
100 1
    public function tick()
101
    {
102 1
        if (!$this->timings instanceof SplObjectStorage) {
103 1
            $this->timings = new SplObjectStorage();
104 1
        }
105
106 1
        $waiting = [];
107 1
        $running = [];
108
109 1
        foreach ($this->waiting as $task) {
110 1
            if ($this->isTaskCancelled($task)) {
111
                continue;
112
            }
113
114 1
            if (!$this->canRunTask($task)) {
115 1
                $waiting[] = $task;
116 1
                continue;
117
            }
118
119 1
            if ($task->stopsSiblings()) {
120
                $this->stopSiblingTasks($task);
121
            }
122
123 1
            $binary = $this->getBinary();
124 1
            $worker = $this->getWorker();
125 1
            $stdout = $this->getStdOut();
126 1
            $stderr = $this->getStdErr();
127
128 1
            if ($task instanceof Expires) {
129 1
                $this->timings[$task] = time();
130 1
            }
131
132 1
            $output = $this->getShell()->exec("{$binary} {$worker} %s {$stdout} {$stderr} & echo $!", [
133 1
                $this->getTaskString($task),
134 1
            ]);
135
136 1
            if ($task instanceof Process) {
137 1
                $task->setId($output[0]);
138 1
            }
139
140 1
            $this->running[] = $task;
141 1
        }
142
143 1
        foreach ($this->running as $task) {
144 1
            if (!$this->canRemoveTask($task)) {
145 1
                $running[] = $task;
146 1
            }
147 1
        }
148
149 1
        $this->waiting = $waiting;
150 1
        $this->running = $running;
151
152 1
        return !empty($waiting) || !empty($running);
153
    }
154
155
    /**
156
     * Stops sibling processes of a task.
157
     *
158
     * @param Task $task
159
     *
160
     * @return $this
161
     */
162
    private function stopSiblingTasks(Task $task)
163
    {
164
        $handler = $task->getHandler();
165
        
166
        $stdout = $this->getStdOut();
167
        $stderr = $this->getStdErr();
168
169
        foreach ($this->running as $task) {
170
            if ($task->getHandler() === $handler && $task instanceof Process) {
171
                $this->getShell()->exec("kill -9 %s {$stdout} {$stderr} &", [
172
                    $task->getId(),
173
                ]);
174
            }
175
        }
176
177
        return $this;
178
    }
179
180
    /**
181
     * Checks whether a new task can be run.
182
     *
183
     * @param Task $task
184
     *
185
     * @return bool
186
     */
187 1
    private function canRunTask(Task $task)
188
    {
189 1
        if (!$task->canRunTask()) {
190
            return false;
191
        }
192
193 1
        if ($task->ignoresRules()) {
194
            return true;
195
        }
196
197
        $processes = array_filter($this->running, function (Task $task) {
198 1
            return $task instanceof Process;
199 1
        });
200
201 1
        if (count($processes) < 1) {
202 1
            return true;
203
        }
204
205 1
        $profile = $this->getProfileForProcesses($task, $processes);
206
207 1
        return $this->getRules()->canRunTask($task, $profile);
208
    }
209
210
    /**
211
     * Gets the load profile related to a task.
212
     *
213
     * @param Task $task
214
     * @param array $processes
215
     *
216
     * @return Profile
217
     */
218 1
    private function getProfileForProcesses(Task $task, array $processes)
219
    {
220 1
        $stats = $this->getStatsForProcesses($processes);
221
222
        $siblingProcesses = array_filter($processes, function (Task $next) use ($task) {
223 1
            return $next->getHandler() === $task->getHandler();
224 1
        });
225
226 1
        $siblingStats = $this->getStatsForProcesses($siblingProcesses);
227
228 1
        $profile = $this->newProfile();
229
230 1
        $profile->setProcesses($processes);
231 1
        $profile->setProcessorLoad(min(100, array_sum(array_column($stats, 1))));
232 1
        $profile->setMemoryLoad(min(100, array_sum(array_column($stats, 2))));
233
234 1
        $profile->setSiblingProcesses($siblingProcesses);
235 1
        $profile->setSiblingProcessorLoad(min(100, array_sum(array_column($siblingStats, 1))));
236 1
        $profile->setSiblingMemoryLoad(min(100, array_sum(array_column($siblingStats, 2))));
237
238 1
        return $profile;
239
    }
240
241
    /**
242
     * Gets processor and memory stats for a list of processes.
243
     *
244
     * @param Process[] $processes
245
     *
246
     * @return array
247
     */
248 1
    private function getStatsForProcesses(array $processes)
249
    {
250 1
        $stats = [];
251
252 1
        foreach ($processes as $process) {
253 1
            $output = $this->getShell()->exec("ps -o pid,%%cpu,%%mem,state,start -p %s | sed 1d", [
254 1
                $process->getId(),
255 1
            ]);
256
257 1
            if (count($output) < 1) {
258 1
                continue;
259
            }
260
261 1
            $last = $output[count($output) - 1];
262
263 1
            if (trim($last) === "") {
264
                continue;
265
            }
266
267 1
            $parts = preg_split("/\s+/", trim($last));
268
269 1
            $pid = intval($parts[0]);
270
271 1
            if ("{$pid}" !== $parts[0]) {
272
                continue;
273
            }
274
275 1
            $stats[] = $parts;
276 1
        }
277
278 1
        return $stats;
279
    }
280
281
    /**
282
     * Gets or creates a Shell instance.
283
     *
284
     * @return Shell
285
     */
286 2
    public function getShell()
287
    {
288 2
        if ($this->shell === null) {
289 2
            $this->shell = $this->newShell();
290 2
        }
291
292 2
        return $this->shell;
293
    }
294
295
    /**
296
     * @param Shell $shell
297
     *
298
     * @return $this
299
     */
300 1
    public function setShell(Shell $shell)
301
    {
302 1
        $this->shell = $shell;
303
304 1
        return $this;
305
    }
306
307
    /**
308
     * Creates a new Shell instance.
309
     *
310
     * @return Shell
311
     */
312 2
    private function newShell()
313
    {
314 2
        return new BashShell();
315
    }
316
317
    /**
318
     * Creates a new Profile instance.
319
     *
320
     * @return Profile
321
     */
322 1
    private function newProfile()
323
    {
324 1
        return new InMemoryProfile();
325
    }
326
327
    /**
328
     * Gets or creates a new Rules instance.
329
     *
330
     * @return Rules
331
     */
332 2
    public function getRules()
333
    {
334 2
        if ($this->rules === null) {
335 2
            $this->rules = $this->newRules();
336 2
        }
337
338 2
        return $this->rules;
339
    }
340
341
    /**
342
     * @param Rules $rules
343
     *
344
     * @return $this
345
     */
346 1
    public function setRules(Rules $rules)
347
    {
348 1
        $this->rules = $rules;
349
350 1
        return $this;
351
    }
352
353
    /**
354
     * Creates a new Rules instance.
355
     *
356
     * @return Rules
357
     */
358 2
    private function newRules()
359
    {
360 2
        return new InMemoryRules();
361
    }
362
363
    /**
364
     * @param string $binary
365
     *
366
     * @return $this
367
     */
368
    public function setBinary($binary)
369
    {
370
        $this->binary = $binary;
371
372
        return $this;
373
    }
374
375
    /**
376
     * Gets the path of the PHP runtime.
377
     *
378
     * @return string
379
     */
380 1
    public function getBinary()
381
    {
382 1
        if ($this->binary === null) {
383 1
            $this->binary = PHP_BINDIR . "/php";
384 1
        }
385
386 1
        return $this->binary;
387
    }
388
389
    /**
390
     * @param string $worker
391
     *
392
     * @return $this
393
     */
394
    public function setWorker($worker)
395
    {
396
        $this->worker = $worker;
397
398
        return $this;
399
    }
400
401
    /**
402
     * Gets the path of the worker script.
403
     *
404
     * @return string
405
     */
406 1
    public function getWorker()
407
    {
408 1
        if ($this->worker === null) {
409 1
            $this->worker = realpath(__DIR__ . "/../../bin/worker.php");
410 1
        }
411
412 1
        return $this->worker;
413
    }
414
415
    /**
416
     * Gets the path to write stdout to.
417
     *
418
     * @return string
419
     */
420 1
    private function getStdOut()
421
    {
422 1
        if ($this->getLogPath() !== null) {
423
            return ">> " . $this->getLogPath() . "/stdout.log";
424
        }
425
426 1
        return "> /dev/null";
427
    }
428
429
    /**
430
     * @return null|string
431
     */
432 2
    public function getLogPath()
433
    {
434 2
        return $this->logPath;
435
    }
436
437
    /**
438
     * @param string $logPath
439
     *
440
     * @return $this
441
     */
442 1
    public function setLogPath($logPath)
443
    {
444 1
        $this->logPath = $logPath;
445
446 1
        return $this;
447
    }
448
449
    /**
450
     * Gets the path to write stderr to.
451
     *
452
     * @return string
453
     */
454 1
    private function getStdErr()
455
    {
456 1
        if ($this->getLogPath() !== null) {
457
            return "2>> " . $this->getLogPath() . "/stderr.log";
458
        }
459
460 1
        return "2> /dev/null";
461
    }
462
463
    /**
464
     * Gets a string representation of a task, to pass to the worker script.
465
     *
466
     * @param Task $task
467
     *
468
     * @return string
469
     */
470 1
    private function getTaskString(Task $task)
471
    {
472 1
        return base64_encode(serialize($task));
473
    }
474
475
    /**
476
     * Checks whether a task can be removed from the list of running processes.
477
     *
478
     * @param Task $task
479
     *
480
     * @return bool
481
     */
482 1
    private function canRemoveTask(Task $task)
483
    {
484 1
        if (!$task instanceof Process) {
485
            return true;
486
        }
487
488 1
        if ($this->isTaskExpired($task) || $this->isTaskCancelled($task)) {
489
            $this->killTask($task);
490
            return true;
491
        }
492
493 1
        $processes = array_filter($this->running, function (Task $task) {
494 1
            return $task instanceof Process;
495 1
        });
496
497 1
        if (count($processes) < 1) {
498
            return true;
499
        }
500
501 1
        $found = false;
502 1
        $stats = $this->getStatsForProcesses($processes);
503
504 1
        foreach ($stats as $stat) {
505 1
            if ($stat[0] === $task->getId()) {
506 1
                $found = true;
507 1
            }
508 1
        }
509
510 1
        return !$found;
511
    }
512
513
    /**
514
     * Check if the given task is expired
515
     *
516
     * @param Task $task
517
     *
518
     * @return boolean
519
     */
520 1
    private function isTaskExpired(Task $task)
521
    {
522 1
        if ($task instanceof Expires) {
523 1
            $expiresIn = $task->getExpiresIn();
524 1
            $startedAt = $this->timings[$task];
525
526 1
            if ($expiresIn > 0 && (time() - $startedAt) >= $expiresIn) {
527
                return $task->shouldExpire($startedAt);
528
            }
529 1
        }
530
531 1
        return false;
532
    }
533
534
    /**
535
     * Check if the given task is cancelled.
536
     *
537
     * @param Task $task
538
     *
539
     * @return bool
540
     */
541 1
    private function isTaskCancelled(Task $task)
542
    {
543 1
        if ($task instanceof Cancellable) {
544
            return $task->isCancelled();
545
        }
546
547 1
        return false;
548
    }
549
550
    /**
551
     * Revoke any background processes attached to this task.
552
     *
553
     * @param Task $task
554
     *
555
     * @return bool
556
     */
557
    private function killTask(Task $task)
558
    {
559
        if ($task instanceof Process) {
560
            $stdout = $this->getStdOut();
561
            $stderr = $this->getStdErr();
562
            
563
            $this->getShell()->exec("kill -9 %s {$stdout} {$stderr} &", [
564
                $task->getId(),
565
            ]);
566
567
            return true;
568
        }
569
570
        return false;
571
    }
572
573
    /**
574
     * @param Rule $rule
575
     *
576
     * @return $this
577
     */
578 1
    public function addRule(Rule $rule)
579
    {
580 1
        $this->getRules()->addRule($rule);
581
582 1
        return $this;
583
    }
584
585
    /**
586
     * @param Rule $rule
587
     *
588
     * @return $this
589
     */
590 1
    public function removeRule(Rule $rule)
591
    {
592 1
        $this->getRules()->removeRule($rule);
593
594 1
        return $this;
595
    }
596
597
    public function __destruct()
598
    {
599
        foreach ($this->running as $task) {
600
            $this->killTask($task);
601
        }
602
    }
603
}
604