Completed
Push — master ( 579d36...29411d )
by Christopher
02:35
created

ProcessManager::getWaiting()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 4
ccs 0
cts 2
cp 0
rs 10
cc 1
eloc 2
nc 1
nop 0
crap 2
1
<?php
2
3
namespace AsyncPHP\Doorman\Manager;
4
5
use AsyncPHP\Doorman\Cancellable;
6
use AsyncPHP\Doorman\Expires;
7
use AsyncPHP\Doorman\Manager;
8
use AsyncPHP\Doorman\Process;
9
use AsyncPHP\Doorman\Profile;
10
use AsyncPHP\Doorman\Profile\InMemoryProfile;
11
use AsyncPHP\Doorman\Rule;
12
use AsyncPHP\Doorman\Rules;
13
use AsyncPHP\Doorman\Rules\InMemoryRules;
14
use AsyncPHP\Doorman\Shell;
15
use AsyncPHP\Doorman\Shell\BashShell;
16
use AsyncPHP\Doorman\Task;
17
use SplObjectStorage;
18
19
final class ProcessManager implements Manager
20
{
21
    /**
22
     * @var Task[]
23
     */
24
    private $waiting = [];
25
26
    /**
27
     * @var Task[]
28
     */
29
    private $running = [];
30
31
    /**
32
     * @var null|SplObjectStorage
33
     */
34
    private $timings = null;
35
36
    /**
37
     * @var null|string
38
     */
39
    private $logPath;
40
41
    /**
42
     * @var null|Rules
43
     */
44
    private $rules;
45
46
    /**
47
     * @var null|Shell
48
     */
49
    protected $shell;
50
51
    /**
52
     * @var null|string
53
     */
54
    private $binary;
55
56
    /**
57
     * @var null|string
58
     */
59
    private $worker;
60
    
61
    /**
62
     * Get a list of the waiting tasks.
63
     * 
64
     * @return array
65
     */
66
    public function getWaiting()
67
    {
68
        return $this->waiting;
69
    }
70
    
71
    /**
72
     * Get a list of the running tasks.
73
     * 
74
     * @return array
75
     */
76
    public function getRunning()
77
    {
78
        return $this->running;
79
    }
80
81
    /**
82
     * @inheritdoc
83
     *
84
     * @param Task $task
85
     *
86
     * @return $this
87
     */
88 1
    public function addTask(Task $task)
89
    {
90 1
        $this->waiting[] = $task;
91
92 1
        return $this;
93
    }
94
95
    /**
96
     * @inheritdoc
97
     *
98
     * @return bool
99
     */
100 1
    public function tick()
101
    {
102 1
        if (!$this->timings instanceof SplObjectStorage) {
103 1
            $this->timings = new SplObjectStorage();
104
        }
105
106 1
        $waiting = [];
107 1
        $running = [];
108
109 1
        foreach ($this->waiting as $task) {
110 1
            if ($this->isTaskCancelled($task)) {
111
                continue;
112
            }
113
114 1
            if (!$this->canRunTask($task)) {
115 1
                $waiting[] = $task;
116 1
                continue;
117
            }
118
119 1
            if ($task->stopsSiblings()) {
120
                $this->stopSiblingTasks($task);
121
            }
122
123 1
            $binary = $this->getBinary();
124 1
            $worker = $this->getWorker();
125 1
            $stdout = $this->getStdOut();
126 1
            $stderr = $this->getStdErr();
127
128 1
            if ($task instanceof Expires) {
129 1
                $this->timings[$task] = time();
130
            }
131
132 1
            $output = $this->getShell()->exec("{$binary} {$worker} %s {$stdout} {$stderr} & echo $!", [
133 1
                $this->getTaskString($task),
134
            ]);
135
136 1
            if ($task instanceof Process) {
137 1
                $task->setId($output[0]);
138
            }
139
140 1
            $this->running[] = $task;
141
        }
142
143 1
        foreach ($this->running as $task) {
144 1
            if (!$this->canRemoveTask($task)) {
145 1
                $running[] = $task;
146
            }
147
        }
148
149 1
        $this->waiting = $waiting;
150 1
        $this->running = $running;
151
152 1
        return !empty($waiting) || !empty($running);
153
    }
154
155
    /**
156
     * Stops sibling processes of a task.
157
     *
158
     * @param Task $task
159
     *
160
     * @return $this
161
     */
162
    private function stopSiblingTasks(Task $task)
163
    {
164
        $handler = $task->getHandler();
165
166
        foreach ($this->running as $task) {
167
            if ($task->getHandler() === $handler && $task instanceof Process) {
168
                $this->getShell()->exec("kill -9 %s", [
169
                    $task->getId(),
170
                ]);
171
            }
172
        }
173
174
        return $this;
175
    }
176
177
    /**
178
     * Checks whether a new task can be run.
179
     *
180
     * @param Task $task
181
     *
182
     * @return bool
183
     */
184 1
    private function canRunTask(Task $task)
185
    {
186 1
        if (!$task->canRunTask()) {
187
            return false;
188
        }
189
190 1
        if ($task->ignoresRules()) {
191
            return true;
192
        }
193
194
        $processes = array_filter($this->running, function (Task $task) {
195 1
            return $task instanceof Process;
196 1
        });
197
198 1
        if (count($processes) < 1) {
199 1
            return true;
200
        }
201
202 1
        $profile = $this->getProfileForProcesses($task, $processes);
203
204 1
        return $this->getRules()->canRunTask($task, $profile);
205
    }
206
207
    /**
208
     * Gets the load profile related to a task.
209
     *
210
     * @param Task $task
211
     * @param array $processes
212
     *
213
     * @return Profile
214
     */
215 1
    private function getProfileForProcesses(Task $task, array $processes)
216
    {
217 1
        $stats = $this->getStatsForProcesses($processes);
218
219
        $siblingProcesses = array_filter($processes, function (Task $next) use ($task) {
220 1
            return $next->getHandler() === $task->getHandler();
221 1
        });
222
223 1
        $siblingStats = $this->getStatsForProcesses($siblingProcesses);
224
225 1
        $profile = $this->newProfile();
226
227 1
        $profile->setProcesses($processes);
228 1
        $profile->setProcessorLoad(min(100, array_sum(array_column($stats, 1))));
229 1
        $profile->setMemoryLoad(min(100, array_sum(array_column($stats, 2))));
230
231 1
        $profile->setSiblingProcesses($siblingProcesses);
232 1
        $profile->setSiblingProcessorLoad(min(100, array_sum(array_column($siblingStats, 1))));
233 1
        $profile->setSiblingMemoryLoad(min(100, array_sum(array_column($siblingStats, 2))));
234
235 1
        return $profile;
236
    }
237
238
    /**
239
     * Gets processor and memory stats for a list of processes.
240
     *
241
     * @param Process[] $processes
242
     *
243
     * @return array
244
     */
245 1
    private function getStatsForProcesses(array $processes)
246
    {
247 1
        $stats = [];
248
249 1
        foreach ($processes as $process) {
250 1
            $output = $this->getShell()->exec("ps -o pid,%%cpu,%%mem,state,start -p %s | sed 1d", [
251 1
                $process->getId(),
252
            ]);
253
254 1
            if (count($output) < 1) {
255 1
                continue;
256
            }
257
258 1
            $last = $output[count($output) - 1];
259
260 1
            if (trim($last) === "") {
261
                continue;
262
            }
263
264 1
            $parts = preg_split("/\s+/", trim($last));
265
266 1
            $pid = intval($parts[0]);
267
268 1
            if ("{$pid}" !== $parts[0]) {
269
                continue;
270
            }
271
272 1
            $stats[] = $parts;
273
        }
274
275 1
        return $stats;
276
    }
277
278
    /**
279
     * Gets or creates a Shell instance.
280
     *
281
     * @return Shell
282
     */
283 2
    public function getShell()
284
    {
285 2
        if ($this->shell === null) {
286 2
            $this->shell = $this->newShell();
287
        }
288
289 2
        return $this->shell;
290
    }
291
292
    /**
293
     * @param Shell $shell
294
     *
295
     * @return $this
296
     */
297 1
    public function setShell(Shell $shell)
298
    {
299 1
        $this->shell = $shell;
300
301 1
        return $this;
302
    }
303
304
    /**
305
     * Creates a new Shell instance.
306
     *
307
     * @return Shell
308
     */
309 2
    private function newShell()
310
    {
311 2
        return new BashShell();
312
    }
313
314
    /**
315
     * Creates a new Profile instance.
316
     *
317
     * @return Profile
318
     */
319 1
    private function newProfile()
320
    {
321 1
        return new InMemoryProfile();
322
    }
323
324
    /**
325
     * Gets or creates a new Rules instance.
326
     *
327
     * @return Rules
328
     */
329 2
    public function getRules()
330
    {
331 2
        if ($this->rules === null) {
332 2
            $this->rules = $this->newRules();
333
        }
334
335 2
        return $this->rules;
336
    }
337
338
    /**
339
     * @param Rules $rules
340
     *
341
     * @return $this
342
     */
343 1
    public function setRules(Rules $rules)
344
    {
345 1
        $this->rules = $rules;
346
347 1
        return $this;
348
    }
349
350
    /**
351
     * Creates a new Rules instance.
352
     *
353
     * @return Rules
354
     */
355 2
    private function newRules()
356
    {
357 2
        return new InMemoryRules();
358
    }
359
360
    /**
361
     * @param string $binary
362
     *
363
     * @return $this
364
     */
365
    public function setBinary($binary)
366
    {
367
        $this->binary = $binary;
368
369
        return $this;
370
    }
371
372
    /**
373
     * Gets the path of the PHP runtime.
374
     *
375
     * @return string
376
     */
377 1
    public function getBinary()
378
    {
379 1
        if ($this->binary === null) {
380 1
            $this->binary = PHP_BINDIR . "/php";
381
        }
382
383 1
        return $this->binary;
384
    }
385
386
    /**
387
     * @param string $worker
388
     *
389
     * @return $this
390
     */
391
    public function setWorker($worker)
392
    {
393
        $this->worker = $worker;
394
395
        return $this;
396
    }
397
398
    /**
399
     * Gets the path of the worker script.
400
     *
401
     * @return string
402
     */
403 1
    public function getWorker()
404
    {
405 1
        if ($this->worker === null) {
406 1
            $this->worker = realpath(__DIR__ . "/../../bin/worker.php");
407
        }
408
409 1
        return $this->worker;
410
    }
411
412
    /**
413
     * Gets the path to write stdout to.
414
     *
415
     * @return string
416
     */
417 1
    private function getStdOut()
418
    {
419 1
        if ($this->getLogPath() !== null) {
420
            return ">> " . $this->getLogPath() . "/stdout.log";
421
        }
422
423 1
        return "> /dev/null";
424
    }
425
426
    /**
427
     * @return null|string
428
     */
429 2
    public function getLogPath()
430
    {
431 2
        return $this->logPath;
432
    }
433
434
    /**
435
     * @param string $logPath
436
     *
437
     * @return $this
438
     */
439 1
    public function setLogPath($logPath)
440
    {
441 1
        $this->logPath = $logPath;
442
443 1
        return $this;
444
    }
445
446
    /**
447
     * Gets the path to write stderr to.
448
     *
449
     * @return string
450
     */
451 1
    private function getStdErr()
452
    {
453 1
        if ($this->getLogPath() !== null) {
454
            return "2>> " . $this->getLogPath() . "/stderr.log";
455
        }
456
457 1
        return "2> /dev/null";
458
    }
459
460
    /**
461
     * Gets a string representation of a task, to pass to the worker script.
462
     *
463
     * @param Task $task
464
     *
465
     * @return string
466
     */
467 1
    private function getTaskString(Task $task)
468
    {
469 1
        return base64_encode(serialize($task));
470
    }
471
472
    /**
473
     * Checks whether a task can be removed from the list of running processes.
474
     *
475
     * @param Task $task
476
     *
477
     * @return bool
478
     */
479 1
    private function canRemoveTask(Task $task)
480
    {
481 1
        if (!$task instanceof Process) {
482
            return true;
483
        }
484
485 1
        if ($this->isTaskExpired($task) || $this->isTaskCancelled($task)) {
486
            $this->killTask($task);
487
            return true;
488
        }
489
490 1
        $processes = array_filter($this->running, function (Task $task) {
491 1
            return $task instanceof Process;
492 1
        });
493
494 1
        if (count($processes) < 1) {
495
            return true;
496
        }
497
498 1
        $found = false;
499 1
        $stats = $this->getStatsForProcesses($processes);
500
501 1
        foreach ($stats as $stat) {
502 1
            if ($stat[0] === $task->getId()) {
503 1
                $found = true;
504
            }
505
        }
506
507 1
        return !$found;
508
    }
509
510
    /**
511
     * Check if the given task is expired
512
     *
513
     * @param Task $task
514
     *
515
     * @return boolean
516
     */
517 1
    private function isTaskExpired(Task $task)
518
    {
519 1
        if ($task instanceof Expires) {
520 1
            $expiresIn = $task->getExpiresIn();
521 1
            $startedAt = $this->timings[$task];
522
523 1
            if ($expiresIn > 0 && (time() - $startedAt) >= $expiresIn) {
524
                return $task->shouldExpire($startedAt);
525
            }
526
        }
527
528 1
        return false;
529
    }
530
531
    /**
532
     * Check if the given task is cancelled.
533
     *
534
     * @param Task $task
535
     *
536
     * @return bool
537
     */
538 1
    private function isTaskCancelled(Task $task)
539
    {
540 1
        if ($task instanceof Cancellable) {
541
            return $task->isCancelled();
542
        }
543
544 1
        return false;
545
    }
546
547
    /**
548
     * Revoke any background processes attached to this task.
549
     *
550
     * @param Task $task
551
     *
552
     * @return bool
553
     */
554
    private function killTask(Task $task)
555
    {
556
        if ($task instanceof Process) {
557
            $this->getShell()->exec("kill -9 %s", [
558
                $task->getId(),
559
            ]);
560
561
            return true;
562
        }
563
564
        return false;
565
    }
566
567
    /**
568
     * @param Rule $rule
569
     *
570
     * @return $this
571
     */
572 1
    public function addRule(Rule $rule)
573
    {
574 1
        $this->getRules()->addRule($rule);
575
576 1
        return $this;
577
    }
578
579
    /**
580
     * @param Rule $rule
581
     *
582
     * @return $this
583
     */
584 1
    public function removeRule(Rule $rule)
585
    {
586 1
        $this->getRules()->removeRule($rule);
587
588 1
        return $this;
589
    }
590
591
    public function __destruct()
592
    {
593
        foreach ($this->running as $task) {
594
            $this->killTask($task);
595
        }
596
    }
597
}
598