Passed
Push — master ( 60a4ae...29fcc8 )
by
unknown
46:42 queued 14:26
created

MultiProcessRunConsole::getSubscribedSignals()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 5
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 1
eloc 3
c 1
b 0
f 0
nc 1
nop 0
dl 0
loc 5
rs 10
1
<?php
2
3
/**
4
 * Copyright © 2016-present Spryker Systems GmbH. All rights reserved.
5
 * Use of this software requires acceptance of the Evaluation License Agreement. See LICENSE file.
6
 */
7
8
namespace Spryker\Zed\Console\Communication\Plugin\Console;
9
10
use Spryker\Zed\Kernel\Communication\Console\Console;
11
use Symfony\Component\Console\Command\SignalableCommandInterface;
12
use Symfony\Component\Console\Input\ArrayInput;
13
use Symfony\Component\Console\Input\InputArgument;
14
use Symfony\Component\Console\Input\InputInterface;
15
use Symfony\Component\Console\Input\InputOption;
16
use Symfony\Component\Console\Output\OutputInterface;
17
use Symfony\Component\Process\Process;
18
use const SIGQUIT;
19
use const SIGTERM;
20
21
/**
22
 * @method \Spryker\Zed\Console\Business\ConsoleFacadeInterface getFacade()
23
 * @method \Spryker\Zed\Console\Communication\ConsoleCommunicationFactory getFactory()
24
 */
25
class MultiProcessRunConsole extends Console implements SignalableCommandInterface
26
{
27
    /**
28
     * @var string
29
     */
30
    protected const COMMAND_NAME = 'multi-process:run';
31
32
    /**
33
     * @var string
34
     */
35
    protected const DESCRIPTION = 'Runs child console commands in a loop. Example of usage: vendor/bin/console multi-process:run "queue:worker:start publish" 120 -t -m 2 -t 30';
36
37
    /**
38
     * @var string
39
     */
40
    protected const ARGUMENT_CONSOLE_CHILD = 'child';
41
42
    /**
43
     * @var string
44
     */
45
    protected const ARGUMENT_CONSOLE_TOTAL_TIMEOUT = 'total_timeout';
46
47
    /**
48
     * @var int
49
     */
50
    protected const DEFAULT_TOTAL_TIMEOUT_IN_SECONDS = 600;
51
52
    /**
53
     * @var int
54
     */
55
    protected const DEFAULT_CHILD_PROCESS_TIMEOUT = 60;
56
57
    /**
58
     * @var int
59
     */
60
    protected const DEFAULT_CHILD_MINIMUM_DURATION_TIME_SECONDS = 0;
61
62
    /**
63
     * @var string
64
     */
65
    protected const OPTION_SEPARATE_THREAD = 'separate_thread';
66
67
    /**
68
     * @var string
69
     */
70
    protected const OPTION_CHILD_MINIMUM_DURATION_TIME_SECONDS = 'child_min_duration';
71
72
    /**
73
     * @var string
74
     */
75
    protected const OPTION_CHILD_PROCESS_TIMEOUT = 'child_timeout';
76
77
    /**
78
     * @var string
79
     */
80
    protected const RUNNER_COMMAND = APPLICATION_VENDOR_DIR . '/bin/console ';
81
82
    /**
83
     * @var bool
84
     */
85
    protected bool $shouldStop = false;
86
87
    /**
88
     * @return void
89
     */
90
    protected function configure(): void
91
    {
92
        $this->setName(static::COMMAND_NAME);
93
        $this->setDescription(static::DESCRIPTION);
94
        $this->addArgument(static::ARGUMENT_CONSOLE_CHILD, InputArgument::REQUIRED, 'The full command you want to run as a child.');
95
        $this->addArgument(static::ARGUMENT_CONSOLE_TOTAL_TIMEOUT, InputArgument::OPTIONAL, sprintf('Duration Total Time in seconds while child command can start again, default is %s. Set 0 to run process endless', static::DEFAULT_TOTAL_TIMEOUT_IN_SECONDS), static::DEFAULT_TOTAL_TIMEOUT_IN_SECONDS);
96
        $this->addOption(static::OPTION_SEPARATE_THREAD, 's', InputOption::VALUE_OPTIONAL, 'Run the command in the separate process to eliminate process cache collisions.', false);
97
        $this->addOption(static::OPTION_CHILD_PROCESS_TIMEOUT, 't', InputOption::VALUE_OPTIONAL, 'Set maximum time of execution for child sub process, in seconds.', static::DEFAULT_CHILD_PROCESS_TIMEOUT);
98
        $this->addOption(static::OPTION_CHILD_MINIMUM_DURATION_TIME_SECONDS, 'm', InputOption::VALUE_OPTIONAL, 'Minimal child sub process execution time, in seconds. Skip or set to 0 to disable this check.', static::DEFAULT_CHILD_MINIMUM_DURATION_TIME_SECONDS);
99
100
        parent::configure();
101
    }
102
103
    /**
104
     * @param \Symfony\Component\Console\Input\InputInterface $input
105
     * @param \Symfony\Component\Console\Output\OutputInterface $output
106
     *
107
     * @return int
108
     */
109
    protected function execute(InputInterface $input, OutputInterface $output): int
110
    {
111
        $this->input = $input;
112
        $this->output = $output;
113
        $maxDuration = abs((int)$input->getArgument(static::ARGUMENT_CONSOLE_TOTAL_TIMEOUT));
114
115
        return $this->runSubProcess($maxDuration);
0 ignored issues
show
Bug introduced by
It seems like $maxDuration can also be of type double; however, parameter $maxDuration of Spryker\Zed\Console\Comm...onsole::runSubProcess() does only seem to accept integer, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

115
        return $this->runSubProcess(/** @scrutinizer ignore-type */ $maxDuration);
Loading history...
116
    }
117
118
    /**
119
     * @param int $maxDuration
120
     *
121
     * @return int
122
     */
123
    protected function runSubProcess(int $maxDuration): int
124
    {
125
        $isLimited = $maxDuration > 0;
126
        $startTime = microtime(true);
127
        $lastChildDuration = 0;
128
        $predictedExecutionTime = 0;
129
        while ((!$isLimited || $predictedExecutionTime < $maxDuration) && !$this->shouldStop) {
130
            $diffTime = microtime(true) - $startTime;
131
            $predictedExecutionTime = $diffTime + $lastChildDuration;
132
            if ($this->output->isVerbose()) {
133
                $this->info(sprintf('<fg=green>Starting %schild process. Timer: %s </>', $isLimited ? '' : 'endless ', $diffTime));
134
            }
135
136
            $startTimeChild = microtime(true);
137
            $result = $this->runProcess();
138
            if ($result === static::CODE_ERROR) {
139
                return static::CODE_ERROR;
140
            }
141
            $lastChildDuration = microtime(true) - $startTimeChild;
142
            if ($this->output->isVerbose()) {
143
                $this->info('<fg=magenta>Child process executed. Timer: ' . ($diffTime + $lastChildDuration) . '</>');
144
            }
145
            $this->waitTillMinExecutionTime($lastChildDuration);
146
        }
147
148
        return static::CODE_SUCCESS;
149
    }
150
151
    /**
152
     * @param string $command
153
     *
154
     * @return \Symfony\Component\Process\Process
155
     */
156
    protected function createProcess(string $command): Process
157
    {
158
        $processTimeout = $this->getSubprocessTimeout();
159
160
        return Process::fromShellCommandline(
161
            static::RUNNER_COMMAND . $command,
162
            APPLICATION_ROOT_DIR,
163
            null,
164
            null,
165
            $processTimeout,
166
        );
167
    }
168
169
    /**
170
     * @param float $duration
171
     *
172
     * @return void
173
     */
174
    protected function waitTillMinExecutionTime(float $duration): void
175
    {
176
        $minExecutionTime = abs((int)$this->input->getOption(static::OPTION_CHILD_MINIMUM_DURATION_TIME_SECONDS));
177
        if ($duration < $minExecutionTime) {
178
            if ($this->output->isVerbose()) {
179
                $this->info(
180
                    sprintf('<fg=yellow>Process waiting as not reached minimal execution time %ss (duration: %ss)', $minExecutionTime, $duration),
181
                );
182
            }
183
184
            usleep((int)(($minExecutionTime - $duration) * 1e6));
185
        }
186
    }
187
188
    /**
189
     * @return int
190
     */
191
    protected function runProcess(): int
192
    {
193
        $separateThread = filter_var($this->input->getOption(static::OPTION_SEPARATE_THREAD), FILTER_VALIDATE_BOOLEAN);
194
        if ($separateThread) {
195
            return $this->runCommandAsSeparateThread();
196
        }
197
198
        return $this->runCommandDirectly();
199
    }
200
201
    /**
202
     * @return int
203
     */
204
    protected function runCommandAsSeparateThread(): int
205
    {
206
        $command = $this->input->getArgument(static::ARGUMENT_CONSOLE_CHILD);
207
        $this->createProcess($command)->run(function ($direction, $data): void {
208
            if ($this->output->isVerbose()) {
209
                $this->info($data);
210
            }
211
        });
212
213
        return static::CODE_SUCCESS;
214
    }
215
216
    /**
217
     * @return int
218
     */
219
    protected function runCommandDirectly(): int
220
    {
221
        $child = $this->input->getArgument(static::ARGUMENT_CONSOLE_CHILD);
222
        $childArguments = array_filter(explode(' ', $child));
223
        $childCommandName = array_shift($childArguments);
224
        /** @var \Spryker\Zed\Kernel\Communication\Console\Console $childConsoleCommand */
225
        $childConsoleCommand = $this->getApplication()->find($childCommandName);
226
        $childConsoleCommandInput = $this->prepareChildConsoleCommandInput($childConsoleCommand);
227
228
        return $childConsoleCommand->run($childConsoleCommandInput, $this->output);
229
    }
230
231
    /**
232
     * @param \Spryker\Zed\Kernel\Communication\Console\Console $childConsoleCommand
233
     * @param array<string> $childArguments
234
     *
235
     * @return \Symfony\Component\Console\Input\ArrayInput
236
     */
237
    protected function prepareChildConsoleCommandInput(Console $childConsoleCommand, array $childArguments = []): ArrayInput
238
    {
239
        $filteredArgs = [];
240
        $options = [];
241
242
        foreach ($childArguments as $childArgument) {
243
            $isLongOption = str_starts_with($childArgument, '--');
244
            $isShortOption = str_starts_with($childArgument, '-');
245
            if ($isShortOption || $isLongOption) {
246
                $option = explode('=', $childArgument);
247
                if (
248
                    ($isLongOption && $childConsoleCommand->getDefinition()->hasOption($option[0]))
249
                    ||
250
                    ($isShortOption && $childConsoleCommand->getDefinition()->hasShortcut(ltrim($option[0], '-')))
251
                ) {
252
                    $options[$option[0]] = $option[1] ?? true;
253
                }
254
255
                continue;
256
            }
257
258
            $filteredArgs[] = $childArgument;
259
        }
260
261
        $arguments = [];
262
        $key = 0;
263
        foreach ($childConsoleCommand->getDefinition()->getArguments() as $name => $argument) {
264
            $arguments[$name] = $filteredArgs[$key] ?? $argument->getDefault();
265
            $key++;
266
        }
267
268
        $input = array_merge($arguments, $options);
269
270
        return new ArrayInput($input);
271
    }
272
273
    /**
274
     * @return float|null
275
     */
276
    protected function getSubprocessTimeout(): ?float
277
    {
278
        $processTimeout = $this->input->getOption(static::OPTION_CHILD_PROCESS_TIMEOUT);
279
        if (!$processTimeout) {
280
            return null;
281
        }
282
283
        return (float)$processTimeout;
284
    }
285
286
    /**
287
     * Added possibility to stop workers by signals a that process can be finished healthy.
288
     *
289
     * @return list<int>
290
     */
291
    public function getSubscribedSignals(): array
292
    {
293
        return [
294
            SIGTERM,
295
            SIGQUIT,
296
        ];
297
    }
298
299
    /**
300
     * @param int $signal
301
     *
302
     * @return int|false
303
     */
304
    public function handleSignal(int $signal): int|false
305
    {
306
        $this->shouldStop = true;
307
        if ($this->output->isVerbose()) {
308
            $this->info(sprintf('<fg=magenta>The %s signal was caught.</>', $signal));
309
        }
310
311
        return static::CODE_SUCCESS;
312
    }
313
}
314