ParallelExec   A
last analyzed

Complexity

Total Complexity 22

Size/Duplication

Total Lines 176
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 4

Importance

Changes 0
Metric Value
wmc 22
lcom 1
cbo 4
dl 0
loc 176
rs 10
c 0
b 0
f 0

9 Methods

Rating   Name   Duplication   Size   Complexity  
A waitInterval() 0 5 1
A getPrinted() 0 4 1
A printed() 0 5 1
A process() 0 6 1
A timeout() 0 5 1
A idleTimeout() 0 5 1
A getCommand() 0 4 1
A progressIndicatorSteps() 0 4 1
C run() 0 56 14
1
<?php
2
3
namespace Robo\Task\Base;
4
5
use Robo\Contract\CommandInterface;
6
use Robo\Contract\PrintedInterface;
7
use Robo\Result;
8
use Robo\Task\BaseTask;
9
use Symfony\Component\Process\Exception\ProcessTimedOutException;
10
use Symfony\Component\Process\Process;
11
12
/**
13
 * Class ParallelExecTask
14
 *
15
 * ``` php
16
 * <?php
17
 * $this->taskParallelExec()
18
 *   ->process('php ~/demos/script.php hey')
19
 *   ->process('php ~/demos/script.php hoy')
20
 *   ->process('php ~/demos/script.php gou')
21
 *   ->run();
22
 * ?>
23
 * ```
24
 */
25
class ParallelExec extends BaseTask implements CommandInterface, PrintedInterface
26
{
27
    use \Robo\Common\CommandReceiver;
28
29
    /**
30
     * @var Process[]
31
     */
32
    protected $processes = [];
33
34
    /**
35
     * @var null|int
36
     */
37
    protected $timeout = null;
38
39
    /**
40
     * @var null|int
41
     */
42
    protected $idleTimeout = null;
43
44
    /**
45
     * @var null|int
46
     */
47
    protected $waitInterval = 0;
48
49
    /**
50
     * @var bool
51
     */
52
    protected $isPrinted = false;
53
54
    /**
55
     * {@inheritdoc}
56
     */
57
    public function getPrinted()
58
    {
59
        return $this->isPrinted;
60
    }
61
62
    /**
63
     * @param bool $isPrinted
64
     *
65
     * @return $this
66
     */
67
    public function printed($isPrinted = true)
68
    {
69
        $this->isPrinted = $isPrinted;
70
        return $this;
71
    }
72
73
    /**
74
     * @param string|\Robo\Contract\CommandInterface $command
75
     *
76
     * @return $this
77
     */
78
    public function process($command)
79
    {
80
        // TODO: Symfony 4 requires that we supply the working directory.
81
        $this->processes[] = Process::fromShellCommandline($this->receiveCommand($command), getcwd());
82
        return $this;
83
    }
84
85
    /**
86
     * Stops process if it runs longer then `$timeout` (seconds).
87
     *
88
     * @param int $timeout
89
     *
90
     * @return $this
91
     */
92
    public function timeout($timeout)
93
    {
94
        $this->timeout = $timeout;
95
        return $this;
96
    }
97
98
    /**
99
     * Stops process if it does not output for time longer then `$timeout` (seconds).
100
     *
101
     * @param int $idleTimeout
102
     *
103
     * @return $this
104
     */
105
    public function idleTimeout($idleTimeout)
106
    {
107
        $this->idleTimeout = $idleTimeout;
108
        return $this;
109
    }
110
111
    /**
112
     * Parallel processing will wait `$waitInterval` seconds after launching each process and before
113
     * the next one.
114
     *
115
     * @param int $waitInterval
116
     *
117
     * @return $this
118
     */
119
    public function waitInterval($waitInterval)
120
    {
121
        $this->waitInterval = $waitInterval;
122
        return $this;
123
    }
124
125
    /**
126
     * {@inheritdoc}
127
     */
128
    public function getCommand()
129
    {
130
        return implode(' && ', $this->processes);
131
    }
132
133
    /**
134
     * @return int
135
     */
136
    public function progressIndicatorSteps()
137
    {
138
        return count($this->processes);
139
    }
140
141
    /**
142
     * {@inheritdoc}
143
     */
144
    public function run()
145
    {
146
        $this->startProgressIndicator();
147
        $running = [];
148
        $queue = $this->processes;
149
        $nextTime = time();
150
        while (true) {
151
            if (($nextTime <= time()) && !empty($queue)) {
152
                $process = array_shift($queue);
153
                $process->setIdleTimeout($this->idleTimeout);
154
                $process->setTimeout($this->timeout);
155
                $process->start();
156
                $this->printTaskInfo($process->getCommandLine());
157
                $running[] = $process;
158
                $nextTime = time() + $this->waitInterval;
159
            }
160
            foreach ($running as $k => $process) {
161
                try {
162
                    $process->checkTimeout();
163
                } catch (ProcessTimedOutException $e) {
164
                    $this->printTaskWarning("Process timed out for {command}", ['command' => $process->getCommandLine(), '_style' => ['command' => 'fg=white;bg=magenta']]);
165
                }
166
                if (!$process->isRunning()) {
167
                    $this->advanceProgressIndicator();
168
                    if ($this->isPrinted) {
169
                        $this->printTaskInfo("Output for {command}:\n\n{output}", ['command' => $process->getCommandLine(), 'output' => $process->getOutput(), '_style' => ['command' => 'fg=white;bg=magenta']]);
170
                        $errorOutput = $process->getErrorOutput();
171
                        if ($errorOutput) {
172
                            $this->printTaskError(rtrim($errorOutput));
173
                        }
174
                    }
175
                    unset($running[$k]);
176
                }
177
            }
178
            if (empty($running) && empty($queue)) {
179
                break;
180
            }
181
            usleep(1000);
182
        }
183
        $this->stopProgressIndicator();
184
185
        $errorMessage = '';
186
        $exitCode = 0;
187
        foreach ($this->processes as $p) {
188
            if ($p->getExitCode() === 0) {
189
                continue;
190
            }
191
            $errorMessage .= "'" . $p->getCommandLine() . "' exited with code " . $p->getExitCode() . " \n";
192
            $exitCode = max($exitCode, $p->getExitCode());
193
        }
194
        if (!$errorMessage) {
195
            $this->printTaskSuccess('{process-count} processes finished running', ['process-count' => count($this->processes)]);
196
        }
197
198
        return new Result($this, $exitCode, $errorMessage, ['time' => $this->getExecutionTime()]);
199
    }
200
}
201