Completed
Push — master ( 58737c...d5fccb )
by Christopher
03:35 queued 01:17
created

ProcessManager::__destruct()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 6
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 6

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 6
ccs 0
cts 4
cp 0
rs 9.4286
cc 2
eloc 3
nc 2
nop 0
crap 6
1
<?php
2
3
namespace AsyncPHP\Doorman\Manager;
4
5
use AsyncPHP\Doorman\Cancellable;
6
use AsyncPHP\Doorman\Expires;
7
use AsyncPHP\Doorman\Manager;
8
use AsyncPHP\Doorman\Process;
9
use AsyncPHP\Doorman\Profile;
10
use AsyncPHP\Doorman\Profile\InMemoryProfile;
11
use AsyncPHP\Doorman\Rule;
12
use AsyncPHP\Doorman\Rules;
13
use AsyncPHP\Doorman\Rules\InMemoryRules;
14
use AsyncPHP\Doorman\Shell;
15
use AsyncPHP\Doorman\Shell\BashShell;
16
use AsyncPHP\Doorman\Task;
17
use SplObjectStorage;
18
19
final class ProcessManager implements Manager
20
{
21
    /**
22
     * @var Task[]
23
     */
24
    private $waiting = [];
25
26
    /**
27
     * @var Task[]
28
     */
29
    private $running = [];
30
31
    /**
32
     * @var null|SplObjectStorage
33
     */
34
    private $timings = null;
35
36
    /**
37
     * @var null|string
38
     */
39
    private $logPath;
40
41
    /**
42
     * @var null|Rules
43
     */
44
    private $rules;
45
46
    /**
47
     * @var null|Shell
48
     */
49
    protected $shell;
50
51
    /**
52
     * @var null|string
53
     */
54
    private $binary;
55
56
    /**
57
     * @var null|string
58
     */
59
    private $worker;
60
61
    /**
62
     * @inheritdoc
63
     *
64
     * @param Task $task
65
     *
66
     * @return $this
67
     */
68 1
    public function addTask(Task $task)
69
    {
70 1
        $this->waiting[] = $task;
71
72 1
        return $this;
73
    }
74
75
    /**
76
     * @inheritdoc
77
     *
78
     * @return bool
79
     */
80 1
    public function tick()
81
    {
82 1
        if (!$this->timings instanceof SplObjectStorage) {
83 1
            $this->timings = new SplObjectStorage();
84
        }
85
86 1
        $waiting = [];
87 1
        $running = [];
88
89 1
        foreach ($this->waiting as $task) {
90 1
            if ($this->isTaskCancelled($task)) {
91
                continue;
92
            }
93
94 1
            if (!$this->canRunTask($task)) {
95 1
                $waiting[] = $task;
96 1
                continue;
97
            }
98
99 1
            if ($task->stopsSiblings()) {
100
                $this->stopSiblingTasks($task);
101
            }
102
103 1
            $binary = $this->getBinary();
104 1
            $worker = $this->getWorker();
105 1
            $stdout = $this->getStdOut();
106 1
            $stderr = $this->getStdErr();
107
108 1
            if ($task instanceof Expires) {
109 1
                $this->timings[$task] = time();
110
            }
111
112 1
            $output = $this->getShell()->exec("{$binary} {$worker} %s {$stdout} {$stderr} & echo $!", [
113 1
                $this->getTaskString($task),
114
            ]);
115
116 1
            if ($task instanceof Process) {
117 1
                $task->setId($output[0]);
118
            }
119
120 1
            $this->running[] = $task;
121
        }
122
123 1
        foreach ($this->running as $task) {
124 1
            if (!$this->canRemoveTask($task)) {
125 1
                $running[] = $task;
126
            }
127
        }
128
129 1
        $this->waiting = $waiting;
130 1
        $this->running = $running;
131
132 1
        return !empty($waiting) || !empty($running);
133
    }
134
135
    /**
136
     * Stops sibling processes of a task.
137
     *
138
     * @param Task $task
139
     *
140
     * @return $this
141
     */
142
    private function stopSiblingTasks(Task $task)
143
    {
144
        $handler = $task->getHandler();
145
146
        foreach ($this->running as $task) {
147
            if ($task->getHandler() === $handler && $task instanceof Process) {
148
                $this->getShell()->exec("kill -9 %s", [
149
                    $task->getId(),
150
                ]);
151
            }
152
        }
153
154
        return $this;
155
    }
156
157
    /**
158
     * Checks whether a new task can be run.
159
     *
160
     * @param Task $task
161
     *
162
     * @return bool
163
     */
164 1
    private function canRunTask(Task $task)
165
    {
166 1
        if (!$task->canRunTask()) {
167
            return false;
168
        }
169
170 1
        if ($task->ignoresRules()) {
171
            return true;
172
        }
173
174
        $processes = array_filter($this->running, function (Task $task) {
175 1
            return $task instanceof Process;
176 1
        });
177
178 1
        if (count($processes) < 1) {
179 1
            return true;
180
        }
181
182 1
        $profile = $this->getProfileForProcesses($task, $processes);
183
184 1
        return $this->getRules()->canRunTask($task, $profile);
185
    }
186
187
    /**
188
     * Gets the load profile related to a task.
189
     *
190
     * @param Task $task
191
     * @param array $processes
192
     *
193
     * @return Profile
194
     */
195 1
    private function getProfileForProcesses(Task $task, array $processes)
196
    {
197 1
        $stats = $this->getStatsForProcesses($processes);
198
199
        $siblingProcesses = array_filter($processes, function (Task $next) use ($task) {
200 1
            return $next->getHandler() === $task->getHandler();
201 1
        });
202
203 1
        $siblingStats = $this->getStatsForProcesses($siblingProcesses);
204
205 1
        $profile = $this->newProfile();
206
207 1
        $profile->setProcesses($processes);
208 1
        $profile->setProcessorLoad(min(100, array_sum(array_column($stats, 1))));
209 1
        $profile->setMemoryLoad(min(100, array_sum(array_column($stats, 2))));
210
211 1
        $profile->setSiblingProcesses($siblingProcesses);
212 1
        $profile->setSiblingProcessorLoad(min(100, array_sum(array_column($siblingStats, 1))));
213 1
        $profile->setSiblingMemoryLoad(min(100, array_sum(array_column($siblingStats, 2))));
214
215 1
        return $profile;
216
    }
217
218
    /**
219
     * Gets processor and memory stats for a list of processes.
220
     *
221
     * @param Process[] $processes
222
     *
223
     * @return array
224
     */
225 1
    private function getStatsForProcesses(array $processes)
226
    {
227 1
        $stats = [];
228
229 1
        foreach ($processes as $process) {
230 1
            $output = $this->getShell()->exec("ps -o pid,%%cpu,%%mem,state,start -p %s | sed 1d", [
231 1
                $process->getId(),
232
            ]);
233
234 1
            if (count($output) < 1) {
235 1
                continue;
236
            }
237
238 1
            $last = $output[count($output) - 1];
239
240 1
            if (trim($last) === "") {
241
                continue;
242
            }
243
244 1
            $parts = preg_split("/\s+/", trim($last));
245
246 1
            $pid = intval($parts[0]);
247
248 1
            if ("{$pid}" !== $parts[0]) {
249
                continue;
250
            }
251
252 1
            $stats[] = $parts;
253
        }
254
255 1
        return $stats;
256
    }
257
258
    /**
259
     * Gets or creates a Shell instance.
260
     *
261
     * @return Shell
262
     */
263 2
    public function getShell()
264
    {
265 2
        if ($this->shell === null) {
266 2
            $this->shell = $this->newShell();
267
        }
268
269 2
        return $this->shell;
270
    }
271
272
    /**
273
     * @param Shell $shell
274
     *
275
     * @return $this
276
     */
277 1
    public function setShell(Shell $shell)
278
    {
279 1
        $this->shell = $shell;
280
281 1
        return $this;
282
    }
283
284
    /**
285
     * Creates a new Shell instance.
286
     *
287
     * @return Shell
288
     */
289 2
    private function newShell()
290
    {
291 2
        return new BashShell();
292
    }
293
294
    /**
295
     * Creates a new Profile instance.
296
     *
297
     * @return Profile
298
     */
299 1
    private function newProfile()
300
    {
301 1
        return new InMemoryProfile();
302
    }
303
304
    /**
305
     * Gets or creates a new Rules instance.
306
     *
307
     * @return Rules
308
     */
309 2
    public function getRules()
310
    {
311 2
        if ($this->rules === null) {
312 2
            $this->rules = $this->newRules();
313
        }
314
315 2
        return $this->rules;
316
    }
317
318
    /**
319
     * @param Rules $rules
320
     *
321
     * @return $this
322
     */
323 1
    public function setRules(Rules $rules)
324
    {
325 1
        $this->rules = $rules;
326
327 1
        return $this;
328
    }
329
330
    /**
331
     * Creates a new Rules instance.
332
     *
333
     * @return Rules
334
     */
335 2
    private function newRules()
336
    {
337 2
        return new InMemoryRules();
338
    }
339
340
    /**
341
     * @param string $binary
342
     *
343
     * @return $this
344
     */
345
    public function setBinary($binary)
346
    {
347
        $this->binary = $binary;
348
349
        return $this;
350
    }
351
352
    /**
353
     * Gets the path of the PHP runtime.
354
     *
355
     * @return string
356
     */
357 1
    public function getBinary()
358
    {
359 1
        if ($this->binary === null) {
360 1
            $this->binary = PHP_BINDIR . "/php";
361
        }
362
363 1
        return $this->binary;
364
    }
365
366
    /**
367
     * @param string $worker
368
     *
369
     * @return $this
370
     */
371
    public function setWorker($worker)
372
    {
373
        $this->worker = $worker;
374
375
        return $this;
376
    }
377
378
    /**
379
     * Gets the path of the worker script.
380
     *
381
     * @return string
382
     */
383 1
    public function getWorker()
384
    {
385 1
        if ($this->worker === null) {
386 1
            $this->worker = realpath(__DIR__ . "/../../bin/worker.php");
387
        }
388
389 1
        return $this->worker;
390
    }
391
392
    /**
393
     * Gets the path to write stdout to.
394
     *
395
     * @return string
396
     */
397 1
    private function getStdOut()
398
    {
399 1
        if ($this->getLogPath() !== null) {
400
            return ">> " . $this->getLogPath() . "/stdout.log";
401
        }
402
403 1
        return "> /dev/null";
404
    }
405
406
    /**
407
     * @return null|string
408
     */
409 2
    public function getLogPath()
410
    {
411 2
        return $this->logPath;
412
    }
413
414
    /**
415
     * @param string $logPath
416
     *
417
     * @return $this
418
     */
419 1
    public function setLogPath($logPath)
420
    {
421 1
        $this->logPath = $logPath;
422
423 1
        return $this;
424
    }
425
426
    /**
427
     * Gets the path to write stderr to.
428
     *
429
     * @return string
430
     */
431 1
    private function getStdErr()
432
    {
433 1
        if ($this->getLogPath() !== null) {
434
            return "2>> " . $this->getLogPath() . "/stderr.log";
435
        }
436
437 1
        return "2> /dev/null";
438
    }
439
440
    /**
441
     * Gets a string representation of a task, to pass to the worker script.
442
     *
443
     * @param Task $task
444
     *
445
     * @return string
446
     */
447 1
    private function getTaskString(Task $task)
448
    {
449 1
        return base64_encode(serialize($task));
450
    }
451
452
    /**
453
     * Checks whether a task can be removed from the list of running processes.
454
     *
455
     * @param Task $task
456
     *
457
     * @return bool
458
     */
459 1
    private function canRemoveTask(Task $task)
460
    {
461 1
        if (!$task instanceof Process) {
462
            return true;
463
        }
464
465 1
        if ($this->isTaskExpired($task) || $this->isTaskCancelled($task)) {
466
            $this->killTask($task);
467
            return true;
468
        }
469
470 1
        $processes = array_filter($this->running, function (Task $task) {
471 1
            return $task instanceof Process;
472 1
        });
473
474 1
        if (count($processes) < 1) {
475
            return true;
476
        }
477
478 1
        $found = false;
479 1
        $stats = $this->getStatsForProcesses($processes);
480
481 1
        foreach ($stats as $stat) {
482 1
            if ($stat[0] === $task->getId()) {
483 1
                $found = true;
484
            }
485
        }
486
487 1
        return !$found;
488
    }
489
490
    /**
491
     * Check if the given task is expired
492
     *
493
     * @param Task $task
494
     *
495
     * @return boolean
496
     */
497 1
    private function isTaskExpired(Task $task)
498
    {
499 1
        if ($task instanceof Expires) {
500 1
            $expiresIn = $task->getExpiresIn();
501 1
            $startedAt = $this->timings[$task];
502
503 1
            if ($expiresIn > 0 && (time() - $startedAt) >= $expiresIn) {
504
                return $task->shouldExpire($startedAt);
505
            }
506
        }
507
508 1
        return false;
509
    }
510
511
    /**
512
     * Check if the given task is cancelled.
513
     *
514
     * @param Task $task
515
     *
516
     * @return bool
517
     */
518 1
    private function isTaskCancelled(Task $task)
519
    {
520 1
        if ($task instanceof Cancellable) {
521
            return $task->isCancelled();
522
        }
523
524 1
        return false;
525
    }
526
527
    /**
528
     * Revoke any background processes attached to this task.
529
     *
530
     * @param Task $task
531
     *
532
     * @return bool
533
     */
534
    private function killTask(Task $task)
535
    {
536
        if ($task instanceof Process) {
537
            $this->getShell()->exec("kill -9 %s", [
538
                $task->getId(),
539
            ]);
540
541
            return true;
542
        }
543
544
        return false;
545
    }
546
547
    /**
548
     * @param Rule $rule
549
     *
550
     * @return $this
551
     */
552 1
    public function addRule(Rule $rule)
553
    {
554 1
        $this->getRules()->addRule($rule);
555
556 1
        return $this;
557
    }
558
559
    /**
560
     * @param Rule $rule
561
     *
562
     * @return $this
563
     */
564 1
    public function removeRule(Rule $rule)
565
    {
566 1
        $this->getRules()->removeRule($rule);
567
568 1
        return $this;
569
    }
570
571
    public function __destruct()
572
    {
573
        foreach ($this->running as $task) {
574
            $this->killTask($task);
575
        }
576
    }
577
}
578