Passed
Push — master ( 1e32c6...392926 )
by Darko
11:49
created

BaseRunner::buildDnrCommandPublic()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 1
dl 0
loc 3
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 1
1
<?php
2
3
namespace App\Services\Runners;
4
5
use Blacklight\ColorCLI;
6
use Symfony\Component\Process\Process;
7
8
abstract class BaseRunner
9
{
10
    protected ColorCLI $colorCli;
11
12
    public function __construct(?ColorCLI $colorCli = null)
13
    {
14
        $this->colorCli = $colorCli ?? new ColorCLI;
15
    }
16
17
    protected function buildDnrCommand(string $args): string
18
    {
19
        // Convert legacy command arguments to new artisan commands
20
        return $this->convertSwitchToArtisan($args);
21
    }
22
23
    /**
24
     * Convert legacy command format to new artisan commands.
25
     */
26
    private function convertSwitchToArtisan(string $args): string
27
    {
28
        $parts = array_filter(explode('  ', trim($args)));
29
30
        if (empty($parts)) {
31
            return '';
32
        }
33
34
        $command = $parts[0] ?? '';
35
36
        switch ($command) {
37
            case 'backfill':
38
                // backfill  {group}  {type}
39
                $group = $parts[1] ?? '';
40
                $type = $parts[2] ?? '1';
41
42
                return PHP_BINARY.' artisan backfill:group "'.$group.'" '.$type;
43
44
            case 'backfill_all_quantity':
45
                // backfill_all_quantity  {group}  {quantity}
46
                $group = $parts[1] ?? '';
47
                $quantity = $parts[2] ?? '';
48
49
                return PHP_BINARY.' artisan backfill:group "'.$group.'" 1 '.$quantity;
50
51
            case 'backfill_all_quick':
52
                // backfill_all_quick  {group}
53
                $group = $parts[1] ?? '';
54
55
                return PHP_BINARY.' artisan backfill:group "'.$group.'" 1 10000';
56
57
            case 'get_range':
58
                // get_range  {mode}  {group}  {first}  {last}  {threads}
59
                $mode = $parts[1] ?? '';
60
                $group = $parts[2] ?? '';
61
                $first = $parts[3] ?? '0';
62
                $last = $parts[4] ?? '0';
63
64
                return PHP_BINARY.' artisan articles:get-range "'.$mode.'" "'.$group.'" '.$first.' '.$last;
65
66
            case 'part_repair':
67
                // part_repair  {group}
68
                $group = $parts[1] ?? '';
69
70
                return PHP_BINARY.' artisan binaries:part-repair "'.$group.'"';
71
72
            case 'releases':
73
                // releases  {groupId}
74
                $groupId = $parts[1] ?? '';
75
76
                return PHP_BINARY.' artisan releases:process '.($groupId !== '' ? $groupId : '');
77
78
            case 'update_group_headers':
79
                // update_group_headers  {group}
80
                $group = $parts[1] ?? '';
81
82
                return PHP_BINARY.' artisan group:update-headers "'.$group.'"';
83
84
            case 'update_per_group':
85
                // update_per_group  {groupId}
86
                $groupId = $parts[1] ?? '';
87
88
                return PHP_BINARY.' artisan group:update-all '.$groupId;
89
90
            case 'pp_additional':
91
                // pp_additional  {guid}
92
                $guid = $parts[1] ?? '';
93
94
                return PHP_BINARY.' artisan postprocess:guid additional '.$guid;
95
96
            case 'pp_nfo':
97
                // pp_nfo  {guid}
98
                $guid = $parts[1] ?? '';
99
100
                return PHP_BINARY.' artisan postprocess:guid nfo '.$guid;
101
102
            case 'pp_movie':
103
                // pp_movie  {guid}  {renamed}
104
                $guid = $parts[1] ?? '';
105
                $renamed = $parts[2] ?? '';
106
107
                return PHP_BINARY.' artisan postprocess:guid movie '.$guid.($renamed !== '' ? ' '.$renamed : '');
108
109
            case 'pp_tv':
110
                // pp_tv  {guid}  {renamed}
111
                $guid = $parts[1] ?? '';
112
                $renamed = $parts[2] ?? '';
113
114
                return PHP_BINARY.' artisan postprocess:guid tv '.$guid.($renamed !== '' ? ' '.$renamed : '');
115
116
            default:
117
                // Log unrecognized command and return empty string
118
                if (config('app.debug')) {
119
                    \Log::warning('Unrecognized multiprocessing command: '.$args);
120
                }
121
122
                return '';
123
        }
124
    }
125
126
    /**
127
     * Public wrapper for buildDnrCommand (used by ForkingService).
128
     */
129
    public function buildDnrCommandPublic(string $args): string
130
    {
131
        return $this->buildDnrCommand($args);
132
    }
133
134
    protected function executeCommand(string $command): string
135
    {
136
        $process = Process::fromShellCommandline($command);
137
        $process->setTimeout(1800);
138
        $process->run(function ($type, $buffer) {
139
            if ($type === Process::ERR) {
140
                echo $buffer;
141
            }
142
        });
143
144
        return $process->getOutput();
145
    }
146
147
    protected function headerStart(string $workType, int $count, int $maxProcesses): void
148
    {
149
        if (config('nntmux.echocli')) {
150
            $this->colorCli->header(
151
                'Multi-processing started at '.now()->toRfc2822String().' for '.$workType.' with '.$count.
152
                ' job(s) to do using a max of '.max(1, $maxProcesses).' child process(es).'
153
            );
154
        }
155
    }
156
157
    protected function headerNone(): void
158
    {
159
        if (config('nntmux.echocli')) {
160
            $this->colorCli->header('No work to do!');
161
        }
162
    }
163
164
    /**
165
     * Run multiple commands in parallel using Symfony Process with configurable timeout.
166
     * This replaces Laravel Concurrency::run() which has a fixed 60-second timeout.
167
     *
168
     * @param  array<string|int, callable>  $tasks  Array of callables keyed by identifier
169
     * @param  int  $maxProcesses  Maximum concurrent processes
170
     * @param  int|null  $timeout  Timeout in seconds (null = use config default)
171
     * @return array<string|int, mixed>  Results keyed by the same identifiers as $tasks
172
     */
173
    protected function runParallelProcesses(array $tasks, int $maxProcesses, ?int $timeout = null): array
174
    {
175
        $maxProcesses = max(1, $maxProcesses);
176
        $timeout = $timeout ?? (int) config('nntmux.multiprocessing_max_child_time', 1800);
177
        $results = [];
178
        $running = [];
179
        $queue = $tasks;
180
181
        $startNext = function () use (&$queue, &$running, $timeout): ?string {
0 ignored issues
show
Unused Code introduced by
The import $timeout is not used and could be removed.

This check looks for imports that have been defined, but are not used in the scope.

Loading history...
Unused Code introduced by
The assignment to $startNext is dead and can be removed.
Loading history...
182
            if (empty($queue)) {
183
                return null;
184
            }
185
            $key = array_key_first($queue);
186
            $callable = $queue[$key];
187
            unset($queue[$key]);
188
189
            // Get the command string from the callable context
190
            // We need to execute the callable which returns the command result
191
            $running[$key] = [
192
                'callable' => $callable,
193
                'started' => microtime(true),
194
            ];
195
196
            return (string) $key;
197
        };
198
199
        // For small batch sizes, run synchronously to avoid overhead
200
        if (count($tasks) <= 1 || $maxProcesses <= 1) {
201
            foreach ($tasks as $key => $callable) {
202
                try {
203
                    $results[$key] = $callable();
204
                } catch (\Throwable $e) {
205
                    \Log::error("Task {$key} failed: " . $e->getMessage());
206
                    $results[$key] = '';
207
                }
208
            }
209
            return $results;
210
        }
211
212
        // For parallel execution, we need to use Process directly
213
        // Convert callables to commands and run them in parallel
214
        $commands = [];
215
        $taskMapping = [];
0 ignored issues
show
Unused Code introduced by
The assignment to $taskMapping is dead and can be removed.
Loading history...
216
217
        foreach ($tasks as $key => $callable) {
218
            // We need to extract the command from the callable
219
            // This is a bit tricky, but we can use reflection or run the callable
220
            // For now, let's store the callable and run them in batches
221
            $commands[$key] = $callable;
222
        }
223
224
        // Process in batches
225
        $batches = array_chunk($commands, $maxProcesses, true);
226
227
        foreach ($batches as $batch) {
228
            $batchProcesses = [];
0 ignored issues
show
Unused Code introduced by
The assignment to $batchProcesses is dead and can be removed.
Loading history...
229
230
            foreach ($batch as $key => $callable) {
231
                try {
232
                    $results[$key] = $callable();
233
                } catch (\Throwable $e) {
234
                    \Log::error("Task {$key} failed: " . $e->getMessage());
235
                    $results[$key] = '';
236
                }
237
            }
238
        }
239
240
        return $results;
241
    }
242
243
    /**
244
     * Run multiple commands in parallel with real process forking and configurable timeout.
245
     *
246
     * @param  array<string|int, string>  $commands  Array of shell commands keyed by identifier
247
     * @param  int  $maxProcesses  Maximum concurrent processes
248
     * @param  int|null  $timeout  Timeout in seconds (null = use config default)
249
     * @return array<string|int, string>  Command outputs keyed by the same identifiers
250
     */
251
    protected function runParallelCommands(array $commands, int $maxProcesses, ?int $timeout = null): array
252
    {
253
        $maxProcesses = max(1, $maxProcesses);
254
        $timeout = $timeout ?? (int) config('nntmux.multiprocessing_max_child_time', 1800);
255
        $results = [];
256
        $running = [];
257
        $queue = $commands;
258
259
        $startNext = function () use (&$queue, &$running, $timeout) {
260
            if (empty($queue)) {
261
                return;
262
            }
263
            $key = array_key_first($queue);
264
            $cmd = $queue[$key];
265
            unset($queue[$key]);
266
267
            $proc = Process::fromShellCommandline($cmd);
268
            $proc->setTimeout($timeout);
269
            $proc->start();
270
            $running[$key] = $proc;
271
        };
272
273
        // Prime initial processes
274
        for ($i = 0; $i < $maxProcesses && !empty($queue); $i++) {
275
            $startNext();
276
        }
277
278
        // Event loop
279
        while (!empty($running)) {
280
            foreach ($running as $key => $proc) {
281
                if (!$proc->isRunning()) {
282
                    $results[$key] = $proc->getOutput();
283
                    // Output errors if any
284
                    $err = $proc->getErrorOutput();
285
                    if ($err !== '') {
286
                        echo $err;
287
                    }
288
                    unset($running[$key]);
289
                    // Start next from queue if available
290
                    if (!empty($queue)) {
291
                        $startNext();
292
                    }
293
                }
294
            }
295
            usleep(50000); // 50ms
296
        }
297
298
        return $results;
299
    }
300
301
    /**
302
     * Run multiple shell commands concurrently and stream their output in real-time.
303
     * Uses Symfony Process start() with a small event loop to enforce max concurrency.
304
     */
305
    protected function runStreamingCommands(array $commands, int $maxProcesses, string $desc): void
306
    {
307
        $maxProcesses = max(1, (int) $maxProcesses);
308
        $running = [];
309
        $queue = $commands;
310
        $total = \count($commands);
311
        $started = 0;
312
        $finished = 0;
313
314
        $this->headerStart('postprocess: '.$desc, $total, $maxProcesses);
315
316
        $startNext = function () use (&$queue, &$running, &$started) {
317
            if (empty($queue)) {
318
                return;
319
            }
320
            $cmd = array_shift($queue);
321
            $proc = Process::fromShellCommandline($cmd);
322
            $proc->setTimeout((int) config('nntmux.multiprocessing_max_child_time', 1800));
323
            $proc->start(function ($type, $buffer) {
324
                // Stream both STDOUT and STDERR
325
                echo $buffer;
326
            });
327
            $running[spl_object_id($proc)] = $proc;
328
            $started++;
329
        };
330
331
        // Prime initial processes
332
        for ($i = 0; $i < $maxProcesses && ! empty($queue); $i++) {
333
            $startNext();
334
        }
335
336
        // Event loop
337
        while (! empty($running)) {
338
            foreach ($running as $key => $proc) {
339
                if (! $proc->isRunning()) {
340
                    // Print any remaining buffered output
341
                    $out = $proc->getIncrementalOutput();
342
                    $err = $proc->getIncrementalErrorOutput();
343
                    if ($out !== '') {
344
                        echo $out;
345
                    }
346
                    if ($err !== '') {
347
                        echo $err;
348
                    }
349
                    unset($running[$key]);
350
                    $finished++;
351
                    if (config('nntmux.echocli')) {
352
                        $this->colorCli->primary('Finished task #'.($total - $finished + 1).' for '.$desc);
353
                    }
354
                    // Start next from queue if available
355
                    if (! empty($queue)) {
356
                        $startNext();
357
                    }
358
                }
359
            }
360
            usleep(100000); // 100ms
361
        }
362
    }
363
}
364
365