Completed
Push — master ( 5a501b...3982a6 )
by Gaetano
08:04
created

ProcessManager::runParallel()   B

Complexity

Conditions 9
Paths 8

Size

Total Lines 40

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 90

Importance

Changes 0
Metric Value
dl 0
loc 40
ccs 0
cts 31
cp 0
rs 7.7244
c 0
b 0
f 0
cc 9
nc 8
nop 4
crap 90
1
<?php
2
3
namespace Kaliop\eZMigrationBundle\Core\Helper;
4
5
use Symfony\Component\Process\Process;
6
7
/**
8
 * This ProcessManager is a simple wrapper to enable parallel processing using Symfony Process component.
9
 * Original source code from https://github.com/jagandecapri/symfony-parallel-process/blob/master/src/ProcessManager.php (MIT lic.)
10
 */
11
class ProcessManager
12
{
13
    /**
14
     * @param Process[] $processes
15
     * @param int $maxParallel
16
     * @param int $poll microseconds
17
     * @param Callable $callback takes 3 args: $type, $buffer, $process
18
     */
19
    public function runParallel(array $processes, $maxParallel, $poll = 1000, $callback = null)
20
    {
21
        $this->validateProcesses($processes);
22
        // do not modify the object pointers in the argument, copy to local working variable
23
        $processesQueue = $processes;
24
        // fix maxParallel to be max the number of processes or positive
25
        $maxParallel = min(abs($maxParallel), count($processesQueue));
26
        // get the first stack of processes to start at the same time
27
        /** @var Process[] $currentProcesses */
28
        $currentProcesses = array_splice($processesQueue, 0, $maxParallel);
29
        // start the initial stack of processes
30
        foreach ($currentProcesses as $process) {
31
            $process->start(function ($type, $buffer) use ($callback, $process) {
32
                if ($callback) {
33
                    $callback($type, $buffer, $process);
34
                }
35
            });
36
        }
37
        do {
38
            // wait for the given time
39
            usleep($poll);
40
            // remove all finished processes from the stack
41
            foreach ($currentProcesses as $index => $process) {
42
                if (!$process->isRunning()) {
43
                    unset($currentProcesses[$index]);
44
                    // directly add and start new process after the previous finished
45
                    if (count($processesQueue) > 0) {
46
                        $nextProcess = array_shift($processesQueue);
47
                        $nextProcess->start(function ($type, $buffer) use ($callback, $nextProcess) {
48
                            if ($callback) {
49
                                $callback($type, $buffer, $nextProcess);
50
                            }
51
                        });
52
                        $currentProcesses[] = $nextProcess;
53
                    }
54
                }
55
            }
56
            // continue loop while there are processes being executed or waiting for execution
57
        } while (count($processesQueue) > 0 || count($currentProcesses) > 0);
58
    }
59
60
    /**
61
     * @param Process[] $processes
62
     */
63
    protected function validateProcesses(array $processes)
64
    {
65
        if (empty($processes)) {
66
            throw new \InvalidArgumentException('Can not run in parallel 0 commands');
67
        }
68
        foreach ($processes as $process) {
69
            if (!($process instanceof Process)) {
70
                throw new \InvalidArgumentException('Process in array need to be instance of Symfony Process');
71
            }
72
        }
73
    }
74
}
75