Completed
Push — master ( 5da224...d5db9d )
by Greg
02:47
created

ParallelExec::waitInterval()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 5
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 5
rs 9.4285
c 0
b 0
f 0
cc 1
eloc 3
nc 1
nop 1
1
<?php
2
namespace Robo\Task\Base;
3
4
use Robo\Contract\CommandInterface;
5
use Robo\Contract\PrintedInterface;
6
use Robo\Result;
7
use Robo\Task\BaseTask;
8
use Symfony\Component\Process\Exception\ProcessTimedOutException;
9
use Symfony\Component\Process\Process;
10
11
/**
12
 * Class ParallelExecTask
13
 *
14
 * ``` php
15
 * <?php
16
 * $this->taskParallelExec()
17
 *   ->process('php ~/demos/script.php hey')
18
 *   ->process('php ~/demos/script.php hoy')
19
 *   ->process('php ~/demos/script.php gou')
20
 *   ->run();
21
 * ?>
22
 * ```
23
 */
24
class ParallelExec extends BaseTask implements CommandInterface, PrintedInterface
25
{
26
    use \Robo\Common\CommandReceiver;
27
28
    /**
29
     * @var Process[]
30
     */
31
    protected $processes = [];
32
33
    /**
34
     * @var null|int
35
     */
36
    protected $timeout = null;
37
38
    /**
39
     * @var null|int
40
     */
41
    protected $idleTimeout = null;
42
43
    /**
44
     * @var null|int
45
     */
46
    protected $waitInterval = 0;
47
48
    /**
49
     * @var bool
50
     */
51
    protected $isPrinted = false;
52
53
    /**
54
     * {@inheritdoc}
55
     */
56
    public function getPrinted()
57
    {
58
        return $this->isPrinted;
59
    }
60
61
    /**
62
     * @param bool $isPrinted
63
     *
64
     * @return $this
65
     */
66
    public function printed($isPrinted = true)
67
    {
68
        $this->isPrinted = $isPrinted;
69
        return $this;
70
    }
71
72
    /**
73
     * @param string|\Robo\Contract\CommandInterface $command
74
     *
75
     * @return $this
76
     */
77
    public function process($command)
78
    {
79
        $this->processes[] = new Process($this->receiveCommand($command));
80
        return $this;
81
    }
82
83
    /**
84
     * Stops process if it runs longer then `$timeout` (seconds).
85
     *
86
     * @param int $timeout
87
     *
88
     * @return $this
89
     */
90
    public function timeout($timeout)
91
    {
92
        $this->timeout = $timeout;
93
        return $this;
94
    }
95
96
    /**
97
     * Stops process if it does not output for time longer then `$timeout` (seconds).
98
     *
99
     * @param int $idleTimeout
100
     *
101
     * @return $this
102
     */
103
    public function idleTimeout($idleTimeout)
104
    {
105
        $this->idleTimeout = $idleTimeout;
106
        return $this;
107
    }
108
109
    /**
110
     * Parallel processing will wait `$waitInterval` seconds after launching each process and before
111
     * the next one.
112
     *
113
     * @param int $waitInterval
114
     *
115
     * @return $this
116
     */
117
    public function waitInterval($waitInterval)
118
    {
119
        $this->waitInterval = $waitInterval;
120
        return $this;
121
    }
122
123
    /**
124
     * {@inheritdoc}
125
     */
126
    public function getCommand()
127
    {
128
        return implode(' && ', $this->processes);
129
    }
130
131
    /**
132
     * @return int
133
     */
134
    public function progressIndicatorSteps()
135
    {
136
        return count($this->processes);
137
    }
138
139
    /**
140
     * {@inheritdoc}
141
     */
142
    public function run()
143
    {
144
        $this->startProgressIndicator();
145
        $running = [];
146
        $queue = $this->processes;
147
        $nextTime = time();
148
        while (true) {
149
            if (($nextTime <= time()) && !empty($queue)) {
150
                $process = array_shift($queue);
151
                $process->setIdleTimeout($this->idleTimeout);
152
                $process->setTimeout($this->timeout);
153
                $process->start();
154
                $this->printTaskInfo($process->getCommandLine());
155
                $running[] = $process;
156
                $nextTime = time() + $this->waitInterval;
157
            }
158
            foreach ($running as $k => $process) {
159
                try {
160
                    $process->checkTimeout();
161
                } catch (ProcessTimedOutException $e) {
162
                    $this->printTaskWarning("Process timed out for {command}", ['command' => $process->getCommandLine(), '_style' => ['command' => 'fg=white;bg=magenta']]);
163
                }
164
                if (!$process->isRunning()) {
165
                    $this->advanceProgressIndicator();
166
                    if ($this->isPrinted) {
167
                        $this->printTaskInfo("Output for {command}:\n\n{output}", ['command' => $process->getCommandLine(), 'output' => $process->getOutput(), '_style' => ['command' => 'fg=white;bg=magenta']]);
168
                        $errorOutput = $process->getErrorOutput();
169
                        if ($errorOutput) {
170
                            $this->printTaskError(rtrim($errorOutput));
171
                        }
172
                    }
173
                    unset($running[$k]);
174
                }
175
            }
176
            if (empty($running) && empty($queue)) {
177
                break;
178
            }
179
            usleep(1000);
180
        }
181
        $this->stopProgressIndicator();
182
183
        $errorMessage = '';
184
        $exitCode = 0;
185
        foreach ($this->processes as $p) {
186
            if ($p->getExitCode() === 0) {
187
                continue;
188
            }
189
            $errorMessage .= "'" . $p->getCommandLine() . "' exited with code ". $p->getExitCode()." \n";
190
            $exitCode = max($exitCode, $p->getExitCode());
191
        }
192
        if (!$errorMessage) {
193
            $this->printTaskSuccess('{process-count} processes finished running', ['process-count' => count($this->processes)]);
194
        }
195
196
        return new Result($this, $exitCode, $errorMessage, ['time' => $this->getExecutionTime()]);
197
    }
198
}
199