Completed
Push — master ( d5979f...822943 )
by Christopher
02:02
created

ProcessManager::stopSiblingTasks()   A

Complexity

Conditions 4
Paths 3

Size

Total Lines 17
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 20

Importance

Changes 3
Bugs 0 Features 0
Metric Value
c 3
b 0
f 0
dl 0
loc 17
ccs 0
cts 12
cp 0
rs 9.2
cc 4
eloc 9
nc 3
nop 1
crap 20
1
<?php
0 ignored issues
show
Coding Style Compatibility introduced by
For compatibility and reusability of your code, PSR1 recommends that a file should introduce either new symbols (like classes, functions, etc.) or have side-effects (like outputting something, or including other files), but not both at the same time. The first symbol is defined on line 66 and the first side effect is on line 19.

The PSR-1: Basic Coding Standard recommends that a file should either introduce new symbols, that is classes, functions, constants or similar, or have side effects. Side effects are anything that executes logic, like for example printing output, changing ini settings or writing to a file.

The idea behind this recommendation is that merely auto-loading a class should not change the state of an application. It also promotes a cleaner style of programming and makes your code less prone to errors, because the logic is not spread out all over the place.

To learn more about the PSR-1, please see the PHP-FIG site on the PSR-1.

Loading history...
2
3
namespace AsyncPHP\Doorman\Manager;
4
5
use AsyncPHP\Doorman\Cancellable;
6
use AsyncPHP\Doorman\Expires;
7
use AsyncPHP\Doorman\Manager;
8
use AsyncPHP\Doorman\Process;
9
use AsyncPHP\Doorman\Profile;
10
use AsyncPHP\Doorman\Profile\InMemoryProfile;
11
use AsyncPHP\Doorman\Rule;
12
use AsyncPHP\Doorman\Rules;
13
use AsyncPHP\Doorman\Rules\InMemoryRules;
14
use AsyncPHP\Doorman\Shell;
15
use AsyncPHP\Doorman\Shell\BashShell;
16
use AsyncPHP\Doorman\Task;
17
use SplObjectStorage;
18
19
final class ProcessManager implements Manager
0 ignored issues
show
Bug introduced by
Possible parse error: class missing opening or closing brace
Loading history...
20
{
21
    /**
22
     * @var Task[]
23
     */
24
    private $waiting = [];
25
26
    /**
27
     * @var Task[]
28
     */
29
    private $running = [];
30
31
    /**
32
     * @var null|SplObjectStorage
33
     */
34
    private $timings = null;
35
36
    /**
37
     * @var null|string
38
     */
39
    private $logPath;
40
41
    /**
42
     * @var null|Rules
43
     */
44
    private $rules;
45
46
    /**
47
     * @var null|Shell
48
     */
49
    protected $shell;
50
51
    /**
52
     * @var null|string
53
     */
54
    private $binary;
55
56
    /**
57
     * @var null|string
58
     */
59
    private $worker;
60
    
61
    /**
62
     * Get a list of the waiting tasks.
63
     * 
64
     * @return array
65
     */
66
    public function getWaiting()
67
    {
68
        return $this->waiting;
69
    }
70
    
71
    /**
72
     * Get a list of the running tasks.
73
     * 
74
     * @return array
75
     */
76
    public function getRunning()
77
    {
78
        return $this->running;
79
    }
80
81
    /**
82
     * @inheritdoc
83
     *
84
     * @param Task $task
85
     *
86
     * @return $this
87
     */
88 1
    public function addTask(Task $task)
89
    {
90 1
        $this->waiting[] = $task;
91
92 1
        return $this;
93
    }
94
95
    /**
96
     * @inheritdoc
97
     *
98
     * @return bool
99
     */
100 1
    public function tick()
101
    {
102 1
        if (!$this->timings instanceof SplObjectStorage) {
103 1
            $this->timings = new SplObjectStorage();
104 1
        }
105
106 1
        $waiting = [];
107 1
        $running = [];
108
109 1
        foreach ($this->waiting as $task) {
110 1
            if ($this->isTaskCancelled($task)) {
111
                continue;
112
            }
113
114 1
            if (!$this->canRunTask($task)) {
115 1
                $waiting[] = $task;
116 1
                continue;
117
            }
118
119 1
            if ($task->stopsSiblings()) {
120
                $this->stopSiblingTasks($task);
121
            }
122
123 1
            $binary = $this->getBinary();
124 1
            $worker = $this->getWorker();
125 1
            $stdout = $this->getStdOut();
126 1
            $stderr = $this->getStdErr();
127
128 1
            if ($task instanceof Expires) {
129 1
                $this->timings[$task] = time();
130 1
            }
131
132 1
            $output = $this->getShell()->exec("{$binary} {$worker} %s {$stdout} {$stderr} & echo $!", [
133 1
                $this->getTaskString($task),
134 1
            ]);
135
136 1
            if ($task instanceof Process) {
137 1
                $task->setId($output[0]);
138 1
            }
139
140 1
            $this->running[] = $task;
141 1
        }
142
143 1
        foreach ($this->running as $task) {
144 1
            if (!$this->canRemoveTask($task)) {
145 1
                $running[] = $task;
146 1
            }
147 1
        }
148
149 1
        $this->waiting = $waiting;
150 1
        $this->running = $running;
151
152 1
        return !empty($waiting) || !empty($running);
153
    }
154
155
    /**
156
     * Stops sibling processes of a task.
157
     *
158
     * @param Task $task
159
     *
160
     * @return $this
161
     */
162
    private function stopSiblingTasks(Task $task)
163
    {
164
        $handler = $task->getHandler();
165
        
166
        $stdout = $this->getStdOut();
167
        $stderr = $this->getStdErr();
168
169
        foreach ($this->running as $task) {
170
            if ($task->getHandler() === $handler && $task instanceof Process) {
171
                $this->getShell()->exec("kill -9 %s {$stdout} {$stderr} &", [
172
                    $task->getId(),
173
                ]);
174
            }
175
        }
176
177
        return $this;
178
    }
179
180
    /**
181
     * Checks whether a new task can be run.
182
     *
183
     * @param Task $task
184
     *
185
     * @return bool
186
     */
187 1
    private function canRunTask(Task $task)
188
    {
189 1
        if (!$task->canRunTask()) {
190
            return false;
191
        }
192
193 1
        if ($task->ignoresRules()) {
194
            return true;
195
        }
196
197
        $processes = array_filter($this->running, function (Task $task) {
198 1
            return $task instanceof Process;
199 1
        });
200
201 1
        if (count($processes) < 1) {
202 1
            return true;
203
        }
204
205 1
        $profile = $this->getProfileForProcesses($task, $processes);
206
207 1
        return $this->getRules()->canRunTask($task, $profile);
208
    }
209
210
    /**
211
     * Gets the load profile related to a task.
212
     *
213
     * @param Task $task
214
     * @param array $processes
215
     *
216
     * @return Profile
217
     */
218 1
    private function getProfileForProcesses(Task $task, array $processes)
219
    {
220 1
        $stats = $this->getStatsForProcesses($processes);
221
222
        $siblingProcesses = array_filter($processes, function (Task $next) use ($task) {
223 1
            return $next->getHandler() === $task->getHandler();
224 1
        });
225
226 1
        $siblingStats = $this->getStatsForProcesses($siblingProcesses);
227
228 1
        $profile = $this->newProfile();
229
230 1
        $profile->setProcesses($processes);
231 1
        $profile->setProcessorLoad(min(100, array_sum(array_column($stats, 1))));
232 1
        $profile->setMemoryLoad(min(100, array_sum(array_column($stats, 2))));
233
234 1
        $profile->setSiblingProcesses($siblingProcesses);
235 1
        $profile->setSiblingProcessorLoad(min(100, array_sum(array_column($siblingStats, 1))));
236 1
        $profile->setSiblingMemoryLoad(min(100, array_sum(array_column($siblingStats, 2))));
237
238 1
        return $profile;
239
    }
240
241
    /**
242
     * Gets processor and memory stats for a list of processes.
243
     *
244
     * @param Process[] $processes
245
     *
246
     * @return array
247
     */
248 1
    private function getStatsForProcesses(array $processes)
249
    {
250 1
        $stats = [];
251
252 1
        foreach ($processes as $process) {
253 1
            $output = $this->getShell()->exec("ps -o pid,%%cpu,%%mem,state,start -p %s | sed 1d", [
254 1
                $process->getId(),
255 1
            ]);
256
257 1
            if (count($output) < 1) {
258 1
                continue;
259
            }
260
261 1
            $last = $output[count($output) - 1];
262
263 1
            if (trim($last) === "") {
264
                continue;
265
            }
266
267 1
            $parts = preg_split("/\s+/", trim($last));
268
269 1
            $pid = intval($parts[0]);
270
271 1
            if ("{$pid}" !== $parts[0]) {
272
                continue;
273
            }
274
275 1
            $stats[] = $parts;
276 1
        }
277
278 1
        return $stats;
279
    }
280
281
    /**
282
     * Gets or creates a Shell instance.
283
     *
284
     * @return Shell
285
     */
286 2
    public function getShell()
287
    {
288 2
        if ($this->shell === null) {
289 2
            $this->shell = $this->newShell();
290 2
        }
291
292 2
        return $this->shell;
293
    }
294
295
    /**
296
     * @param Shell $shell
297
     *
298
     * @return $this
299
     */
300 1
    public function setShell(Shell $shell)
301
    {
302 1
        $this->shell = $shell;
303
304 1
        return $this;
305
    }
306
307
    /**
308
     * Creates a new Shell instance.
309
     *
310
     * @return Shell
311
     */
312 2
    private function newShell()
313
    {
314 2
        return new BashShell();
315
    }
316
317
    /**
318
     * Creates a new Profile instance.
319
     *
320
     * @return Profile
321
     */
322 1
    private function newProfile()
323
    {
324 1
        return new InMemoryProfile();
325
    }
326
327
    /**
328
     * Gets or creates a new Rules instance.
329
     *
330
     * @return Rules
331
     */
332 2
    public function getRules()
333
    {
334 2
        if ($this->rules === null) {
335 2
            $this->rules = $this->newRules();
336 2
        }
337
338 2
        return $this->rules;
339
    }
340
341
    /**
342
     * @param Rules $rules
343
     *
344
     * @return $this
345
     */
346 1
    public function setRules(Rules $rules)
347
    {
348 1
        $this->rules = $rules;
349
350 1
        return $this;
351
    }
352
353
    /**
354
     * Creates a new Rules instance.
355
     *
356
     * @return Rules
357
     */
358 2
    private function newRules()
359
    {
360 2
        return new InMemoryRules();
361
    }
362
363
    /**
364
     * @param string $binary
365
     *
366
     * @return $this
367
     */
368
    public function setBinary($binary)
369
    {
370
        $this->binary = $binary;
371
372
        return $this;
373
    }
374
375
    /**
376
     * Gets the path of the PHP runtime.
377
     *
378
     * @return string
379
     */
380 1
    public function getBinary()
381
    {
382 1
        if ($this->binary === null) {
383 1
            $this->binary = PHP_BINDIR . "/php";
384 1
        }
385
386 1
        return $this->binary;
387
    }
388
389
    /**
390
     * @param string $worker
391
     *
392
     * @return $this
393
     */
394
    public function setWorker($worker)
395
    {
396
        $this->worker = $worker;
397
398
        return $this;
399
    }
400
401
    /**
402
     * Gets the path of the worker script.
403
     *
404
     * @return string
405
     */
406 1
    public function getWorker()
407
    {
408 1
        if ($this->worker === null) {
409 1
            $this->worker = realpath(__DIR__ . "/../../bin/worker.php");
410 1
        }
411
412 1
        return $this->worker;
413
    }
414
415
    /**
416
     * Gets the path to write stdout to.
417
     *
418
     * @return string
419
     */
420 1
    private function getStdOut()
421
    {
422 1
        if ($this->getLogPath() !== null) {
423
            return ">> " . $this->getLogPath() . "/stdout.log";
424
        }
425
426 1
        return "> /dev/null";
427
    }
428
429
    /**
430
     * @return null|string
431
     */
432 2
    public function getLogPath()
433
    {
434 2
        return $this->logPath;
435
    }
436
437
    /**
438
     * @param string $logPath
439
     *
440
     * @return $this
441
     */
442 1
    public function setLogPath($logPath)
443
    {
444 1
        $this->logPath = $logPath;
445
446 1
        return $this;
447
    }
448
449
    /**
450
     * Gets the path to write stderr to.
451
     *
452
     * @return string
453
     */
454 1
    private function getStdErr()
455
    {
456 1
        if ($this->getLogPath() !== null) {
457
            return "2>> " . $this->getLogPath() . "/stderr.log";
458
        }
459
460 1
        return "2> /dev/null";
461
    }
462
463
    /**
464
     * Gets a string representation of a task, to pass to the worker script.
465
     *
466
     * @param Task $task
467
     *
468
     * @return string
469
     */
470 1
    private function getTaskString(Task $task)
471
    {
472 1
        return base64_encode(serialize($task));
473
    }
474
475
    /**
476
     * Checks whether a task can be removed from the list of running processes.
477
     *
478
     * @param Task $task
479
     *
480
     * @return bool
481
     */
482 1
    private function canRemoveTask(Task $task)
483
    {
484 1
        if (!$task instanceof Process) {
485
            return true;
486
        }
487
488 1
        if ($this->isTaskExpired($task) || $this->isTaskCancelled($task)) {
489
            $this->killTask($task);
490
            return true;
491
        }
492
493 1
        $processes = array_filter($this->running, function (Task $task) {
494 1
            return $task instanceof Process;
495 1
        });
496
497 1
        if (count($processes) < 1) {
498
            return true;
499
        }
500
501 1
        $found = false;
502 1
        $stats = $this->getStatsForProcesses($processes);
503
504 1
        foreach ($stats as $stat) {
505 1
            if ($stat[0] === $task->getId()) {
506 1
                $found = true;
507 1
            }
508 1
        }
509
510 1
        return !$found;
511
    }
512
513
    /**
514
     * Check if the given task is expired
515
     *
516
     * @param Task $task
517
     *
518
     * @return boolean
519
     */
520 1
    private function isTaskExpired(Task $task)
521
    {
522 1
        if ($task instanceof Expires) {
523 1
            $expiresIn = $task->getExpiresIn();
524 1
            $startedAt = $this->timings[$task];
525
526 1
            if ($expiresIn > 0 && (time() - $startedAt) >= $expiresIn) {
527
                return $task->shouldExpire($startedAt);
528
            }
529 1
        }
530
531 1
        return false;
532
    }
533
534
    /**
535
     * Check if the given task is cancelled.
536
     *
537
     * @param Task $task
538
     *
539
     * @return bool
540
     */
541 1
    private function isTaskCancelled(Task $task)
542
    {
543 1
        if ($task instanceof Cancellable) {
544
            return $task->isCancelled();
545
        }
546
547 1
        return false;
548
    }
549
550
    /**
551
     * Revoke any background processes attached to this task.
552
     *
553
     * @param Task $task
554
     *
555
     * @return bool
556
     */
557
    private function killTask(Task $task)
558
    {
559
        if ($task instanceof Process) {
560
            $stdout = $this->getStdOut();
561
            $stderr = $this->getStdErr();
562
            
563
            $this->getShell()->exec("kill -9 %s {$stdout} {$stderr} &", [
564
                $task->getId(),
565
            ]);`
566
567
            return true;
568
        }
569
570
        return false;
571
    }
572
573
    /**
574
     * @param Rule $rule
575 1
     *
576
     * @return $this
577 1
     */
578
    public function addRule(Rule $rule)
579 1
    {
580
        $this->getRules()->addRule($rule);
581
582
        return $this;
583
    }
584
585
    /**
586
     * @param Rule $rule
587 1
     *
588
     * @return $this
589 1
     */
590
    public function removeRule(Rule $rule)
591 1
    {
592
        $this->getRules()->removeRule($rule);
593
594
        return $this;
595
    }
596
597
    public function __destruct()
598
    {
599
        foreach ($this->running as $task) {
600
            $this->killTask($task);
601
        }
602
    }
603
}
604
0 ignored issues
show
Bug introduced by
This code did not parse for me. Apparently, there is an error somewhere around this line:

Syntax error, unexpected EOF, expecting '`'
Loading history...