Completed
Push — master ( 91d598...968d8f )
by Christopher
02:26
created

ProcessManager::getLogPath()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 4
ccs 2
cts 2
cp 1
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 0
crap 1
1
<?php
2
3
namespace AsyncPHP\Doorman\Manager;
4
5
use AsyncPHP\Doorman\Cancellable;
6
use AsyncPHP\Doorman\Expires;
7
use AsyncPHP\Doorman\Manager;
8
use AsyncPHP\Doorman\Process;
9
use AsyncPHP\Doorman\Profile;
10
use AsyncPHP\Doorman\Profile\InMemoryProfile;
11
use AsyncPHP\Doorman\Rule;
12
use AsyncPHP\Doorman\Rules;
13
use AsyncPHP\Doorman\Rules\InMemoryRules;
14
use AsyncPHP\Doorman\Shell;
15
use AsyncPHP\Doorman\Shell\BashShell;
16
use AsyncPHP\Doorman\Task;
17
use SplObjectStorage;
18
19
/**
20
 * Class ProcessManager
21
 *
22
 * @package AsyncPHP\Doorman\Manager
23
 */
24
class ProcessManager implements Manager
25
{
26
    /**
27
     * @var Task[]
28
     */
29
    private $waiting = [];
30
31
    /**
32
     * @var Task[]
33
     */
34
    private $running = [];
35
36
    /**
37
     * @var null|SplObjectStorage
38
     */
39
    private $timings = null;
40
41
    /**
42
     * @var null|string
43
     */
44
    private $logPath;
45
46
    /**
47
     * @var null|Rules
48
     */
49
    private $rules;
50
51
    /**
52
     * @var null|Shell
53
     */
54
    protected $shell;
55
56
    /**
57
     * @var null|string
58
     */
59
    private $binary;
60
61
    /**
62
     * @var null|string
63
     */
64
    private $worker;
65
    
66
    /**
67
     * Get a list of the waiting tasks.
68
     * 
69
     * @return array
70
     */
71
    public function getWaiting()
72
    {
73
        return $this->waiting;
74
    }
75
    
76
    /**
77
     * Get a list of the running tasks.
78
     * 
79
     * @return array
80
     */
81
    public function getRunning()
82
    {
83
        return $this->running;
84
    }
85
86
    /**
87
     * @inheritdoc
88
     *
89
     * @param Task $task
90
     *
91
     * @return $this
92
     */
93 1
    public function addTask(Task $task)
94
    {
95 1
        $this->waiting[] = $task;
96
97 1
        return $this;
98
    }
99
100
    /**
101
     * @inheritdoc
102
     *
103
     * @return bool
104
     */
105 1
    public function tick()
106
    {
107 1
        if (!$this->timings instanceof SplObjectStorage) {
108 1
            $this->timings = new SplObjectStorage();
109 1
        }
110
111 1
        $waiting = [];
112 1
        $running = [];
113
114 1
        foreach ($this->waiting as $task) {
115 1
            if ($this->isTaskCancelled($task)) {
116
                continue;
117
            }
118
119 1
            if (!$this->canRunTask($task)) {
120 1
                $waiting[] = $task;
121 1
                continue;
122
            }
123
124 1
            if ($task->stopsSiblings()) {
125
                $this->stopSiblingTasks($task);
126
            }
127
128 1
            $binary = $this->getBinary();
129 1
            $worker = $this->getWorker();
130 1
            $stdout = $this->getStdOut();
131 1
            $stderr = $this->getStdErr();
132
133 1
            if ($task instanceof Expires) {
134 1
                $this->timings[$task] = time();
135 1
            }
136
137 1
            $command = $this->getCommand($binary, $worker, $stdout, $stderr);
138 1
            $output = $this->getShell()->exec($command, [
139 1
                $this->getTaskString($task),
140 1
            ]);
141
142 1
            if ($task instanceof Process) {
143 1
                $task->setId($output[0]);
144 1
            }
145
146 1
            $this->running[] = $task;
147 1
        }
148
149 1
        foreach ($this->running as $task) {
150 1
            if (!$this->canRemoveTask($task)) {
151 1
                $running[] = $task;
152 1
            }
153 1
        }
154
155 1
        $this->waiting = $waiting;
156 1
        $this->running = $running;
157
158 1
        return !empty($waiting) || !empty($running);
159
    }
160
161
    /**
162
     * Assemble the command
163
     * this can be used to customise command by subclassing and overwriting this function
164
     *
165
     * @param string $binary
166
     * @param string $worker
167
     * @param string $stdout
168
     * @param string $stderr
169
     * @return string
170
     */
171 1
    protected function getCommand($binary, $worker, $stdout, $stderr)
172
    {
173 1
        return "{$binary} {$worker} %s {$stdout} {$stderr} & echo $!";
174
    }
175
176
    /**
177
     * Stops sibling processes of a task.
178
     *
179
     * @param Task $task
180
     *
181
     * @return $this
182
     */
183
    private function stopSiblingTasks(Task $task)
184
    {
185
        $handler = $task->getHandler();
186
        
187
        $stdout = $this->getStdOut();
188
        $stderr = $this->getStdErr();
189
190
        foreach ($this->running as $task) {
191
            if ($task->getHandler() === $handler && $task instanceof Process) {
192
                $this->getShell()->exec("kill -9 %s {$stdout} {$stderr} &", [
193
                    $task->getId(),
194
                ]);
195
            }
196
        }
197
198
        return $this;
199
    }
200
201
    /**
202
     * Checks whether a new task can be run.
203
     *
204
     * @param Task $task
205
     *
206
     * @return bool
207
     */
208 1
    private function canRunTask(Task $task)
209
    {
210 1
        if (!$task->canRunTask()) {
211
            return false;
212
        }
213
214 1
        if ($task->ignoresRules()) {
215
            return true;
216
        }
217
218
        $processes = array_filter($this->running, function (Task $task) {
219 1
            return $task instanceof Process;
220 1
        });
221
222 1
        if (count($processes) < 1) {
223 1
            return true;
224
        }
225
226 1
        $profile = $this->getProfileForProcesses($task, $processes);
227
228 1
        return $this->getRules()->canRunTask($task, $profile);
229
    }
230
231
    /**
232
     * Gets the load profile related to a task.
233
     *
234
     * @param Task $task
235
     * @param array $processes
236
     *
237
     * @return Profile
238
     */
239 1
    private function getProfileForProcesses(Task $task, array $processes)
240
    {
241 1
        $stats = $this->getStatsForProcesses($processes);
242
243
        $siblingProcesses = array_filter($processes, function (Task $next) use ($task) {
244 1
            return $next->getHandler() === $task->getHandler();
245 1
        });
246
247 1
        $siblingStats = $this->getStatsForProcesses($siblingProcesses);
248
249 1
        $profile = $this->newProfile();
250
251 1
        $profile->setProcesses($processes);
252 1
        $profile->setProcessorLoad(min(100, array_sum(array_column($stats, 1))));
253 1
        $profile->setMemoryLoad(min(100, array_sum(array_column($stats, 2))));
254
255 1
        $profile->setSiblingProcesses($siblingProcesses);
256 1
        $profile->setSiblingProcessorLoad(min(100, array_sum(array_column($siblingStats, 1))));
257 1
        $profile->setSiblingMemoryLoad(min(100, array_sum(array_column($siblingStats, 2))));
258
259 1
        return $profile;
260
    }
261
262
    /**
263
     * Gets processor and memory stats for a list of processes.
264
     *
265
     * @param Process[] $processes
266
     *
267
     * @return array
268
     */
269 1
    private function getStatsForProcesses(array $processes)
270
    {
271 1
        $stats = [];
272
273 1
        foreach ($processes as $process) {
274 1
            $output = $this->getShell()->exec("ps -o pid,%%cpu,%%mem,state,start -p %s | sed 1d", [
275 1
                $process->getId(),
276 1
            ]);
277
278 1
            if (count($output) < 1) {
279 1
                continue;
280
            }
281
282 1
            $last = $output[count($output) - 1];
283
284 1
            if (trim($last) === "") {
285
                continue;
286
            }
287
288 1
            $parts = preg_split("/\s+/", trim($last));
289
290 1
            $pid = intval($parts[0]);
291
292 1
            if ("{$pid}" !== $parts[0]) {
293
                continue;
294
            }
295
296 1
            $stats[] = $parts;
297 1
        }
298
299 1
        return $stats;
300
    }
301
302
    /**
303
     * Gets or creates a Shell instance.
304
     *
305
     * @return Shell
306
     */
307 2
    public function getShell()
308
    {
309 2
        if ($this->shell === null) {
310 2
            $this->shell = $this->newShell();
311 2
        }
312
313 2
        return $this->shell;
314
    }
315
316
    /**
317
     * @param Shell $shell
318
     *
319
     * @return $this
320
     */
321 1
    public function setShell(Shell $shell)
322
    {
323 1
        $this->shell = $shell;
324
325 1
        return $this;
326
    }
327
328
    /**
329
     * Creates a new Shell instance.
330
     *
331
     * @return Shell
332
     */
333 2
    private function newShell()
334
    {
335 2
        return new BashShell();
336
    }
337
338
    /**
339
     * Creates a new Profile instance.
340
     *
341
     * @return Profile
342
     */
343 1
    private function newProfile()
344
    {
345 1
        return new InMemoryProfile();
346
    }
347
348
    /**
349
     * Gets or creates a new Rules instance.
350
     *
351
     * @return Rules
352
     */
353 2
    public function getRules()
354
    {
355 2
        if ($this->rules === null) {
356 2
            $this->rules = $this->newRules();
357 2
        }
358
359 2
        return $this->rules;
360
    }
361
362
    /**
363
     * @param Rules $rules
364
     *
365
     * @return $this
366
     */
367 1
    public function setRules(Rules $rules)
368
    {
369 1
        $this->rules = $rules;
370
371 1
        return $this;
372
    }
373
374
    /**
375
     * Creates a new Rules instance.
376
     *
377
     * @return Rules
378
     */
379 2
    private function newRules()
380
    {
381 2
        return new InMemoryRules();
382
    }
383
384
    /**
385
     * @param string $binary
386
     *
387
     * @return $this
388
     */
389
    public function setBinary($binary)
390
    {
391
        $this->binary = $binary;
392
393
        return $this;
394
    }
395
396
    /**
397
     * Gets the path of the PHP runtime.
398
     *
399
     * @return string
400
     */
401 1
    public function getBinary()
402
    {
403 1
        if ($this->binary === null) {
404 1
            $this->binary = PHP_BINDIR . "/php";
405 1
        }
406
407 1
        return $this->binary;
408
    }
409
410
    /**
411
     * @param string $worker
412
     *
413
     * @return $this
414
     */
415
    public function setWorker($worker)
416
    {
417
        $this->worker = $worker;
418
419
        return $this;
420
    }
421
422
    /**
423
     * Gets the path of the worker script.
424
     *
425
     * @return string
426
     */
427 1
    public function getWorker()
428
    {
429 1
        if ($this->worker === null) {
430 1
            $this->worker = realpath(__DIR__ . "/../../bin/worker.php");
431 1
        }
432
433 1
        return $this->worker;
434
    }
435
436
    /**
437
     * Gets the path to write stdout to.
438
     *
439
     * @return string
440
     */
441 1
    private function getStdOut()
442
    {
443 1
        if ($this->getLogPath() !== null) {
444
            return ">> " . $this->getLogPath() . "/stdout.log";
445
        }
446
447 1
        return "> /dev/null";
448
    }
449
450
    /**
451
     * @return null|string
452
     */
453 2
    public function getLogPath()
454
    {
455 2
        return $this->logPath;
456
    }
457
458
    /**
459
     * @param string $logPath
460
     *
461
     * @return $this
462
     */
463 1
    public function setLogPath($logPath)
464
    {
465 1
        $this->logPath = $logPath;
466
467 1
        return $this;
468
    }
469
470
    /**
471
     * Gets the path to write stderr to.
472
     *
473
     * @return string
474
     */
475 1
    private function getStdErr()
476
    {
477 1
        if ($this->getLogPath() !== null) {
478
            return "2>> " . $this->getLogPath() . "/stderr.log";
479
        }
480
481 1
        return "2> /dev/null";
482
    }
483
484
    /**
485
     * Gets a string representation of a task, to pass to the worker script.
486
     *
487
     * @param Task $task
488
     *
489
     * @return string
490
     */
491 1
    private function getTaskString(Task $task)
492
    {
493 1
        return base64_encode(serialize($task));
494
    }
495
496
    /**
497
     * Checks whether a task can be removed from the list of running processes.
498
     *
499
     * @param Task $task
500
     *
501
     * @return bool
502
     */
503 1
    private function canRemoveTask(Task $task)
504
    {
505 1
        if (!$task instanceof Process) {
506
            return true;
507
        }
508
509 1
        if ($this->isTaskExpired($task) || $this->isTaskCancelled($task)) {
510
            $this->killTask($task);
511
            return true;
512
        }
513
514 1
        $processes = array_filter($this->running, function (Task $task) {
515 1
            return $task instanceof Process;
516 1
        });
517
518 1
        if (count($processes) < 1) {
519
            return true;
520
        }
521
522 1
        $found = false;
523 1
        $stats = $this->getStatsForProcesses($processes);
524
525 1
        foreach ($stats as $stat) {
526 1
            if ($stat[0] === $task->getId()) {
527 1
                $found = true;
528 1
            }
529 1
        }
530
531 1
        return !$found;
532
    }
533
534
    /**
535
     * Check if the given task is expired
536
     *
537
     * @param Task $task
538
     *
539
     * @return boolean
540
     */
541 1
    private function isTaskExpired(Task $task)
542
    {
543 1
        if ($task instanceof Expires) {
544 1
            $expiresIn = $task->getExpiresIn();
545 1
            $startedAt = $this->timings[$task];
546
547 1
            if ($expiresIn > 0 && (time() - $startedAt) >= $expiresIn) {
548
                return $task->shouldExpire($startedAt);
549
            }
550 1
        }
551
552 1
        return false;
553
    }
554
555
    /**
556
     * Check if the given task is cancelled.
557
     *
558
     * @param Task $task
559
     *
560
     * @return bool
561
     */
562 1
    private function isTaskCancelled(Task $task)
563
    {
564 1
        if ($task instanceof Cancellable) {
565
            return $task->isCancelled();
566
        }
567
568 1
        return false;
569
    }
570
571
    /**
572
     * Revoke any background processes attached to this task.
573
     *
574
     * @param Task $task
575
     *
576
     * @return bool
577
     */
578
    private function killTask(Task $task)
579
    {
580
        if ($task instanceof Process) {
581
            $stdout = $this->getStdOut();
582
            $stderr = $this->getStdErr();
583
            
584
            $this->getShell()->exec("kill -9 %s {$stdout} {$stderr} &", [
585
                $task->getId(),
586
            ]);
587
588
            return true;
589
        }
590
591
        return false;
592
    }
593
594
    /**
595
     * @param Rule $rule
596
     *
597
     * @return $this
598
     */
599 1
    public function addRule(Rule $rule)
600
    {
601 1
        $this->getRules()->addRule($rule);
602
603 1
        return $this;
604
    }
605
606
    /**
607
     * @param Rule $rule
608
     *
609
     * @return $this
610
     */
611 1
    public function removeRule(Rule $rule)
612
    {
613 1
        $this->getRules()->removeRule($rule);
614
615 1
        return $this;
616
    }
617
618
    public function __destruct()
619
    {
620
        foreach ($this->running as $task) {
621
            $this->killTask($task);
622
        }
623
    }
624
}
625