Issues (995)

app/Services/Runners/BaseRunner.php (4 issues)

Severity
1
<?php
2
3
namespace App\Services\Runners;
4
5
use Blacklight\ColorCLI;
6
use Symfony\Component\Process\Process;
7
8
abstract class BaseRunner
9
{
10
    protected ColorCLI $colorCli;
11
12
    public function __construct(?ColorCLI $colorCli = null)
13
    {
14
        $this->colorCli = $colorCli ?? new ColorCLI;
15
    }
16
17
    protected function buildDnrCommand(string $args): string
18
    {
19
        // Convert legacy command arguments to new artisan commands
20
        return $this->convertSwitchToArtisan($args);
21
    }
22
23
    /**
24
     * Convert legacy command format to new artisan commands.
25
     */
26
    private function convertSwitchToArtisan(string $args): string
27
    {
28
        $parts = array_filter(explode('  ', trim($args)));
29
30
        if (empty($parts)) {
31
            return '';
32
        }
33
34
        $command = $parts[0] ?? '';
35
36
        switch ($command) {
37
            case 'backfill':
38
                // backfill  {group}  {type}
39
                $group = $parts[1] ?? '';
40
                $type = $parts[2] ?? '1';
41
42
                return PHP_BINARY.' artisan backfill:group "'.$group.'" '.$type;
43
44
            case 'backfill_all_quantity':
45
                // backfill_all_quantity  {group}  {quantity}
46
                $group = $parts[1] ?? '';
47
                $quantity = $parts[2] ?? '';
48
49
                return PHP_BINARY.' artisan backfill:group "'.$group.'" 1 '.$quantity;
50
51
            case 'backfill_all_quick':
52
                // backfill_all_quick  {group}
53
                $group = $parts[1] ?? '';
54
55
                return PHP_BINARY.' artisan backfill:group "'.$group.'" 1 10000';
56
57
            case 'get_range':
58
                // get_range  {mode}  {group}  {first}  {last}  {threads}
59
                $mode = $parts[1] ?? '';
60
                $group = $parts[2] ?? '';
61
                $first = $parts[3] ?? '0';
62
                $last = $parts[4] ?? '0';
63
64
                return PHP_BINARY.' artisan articles:get-range "'.$mode.'" "'.$group.'" '.$first.' '.$last;
65
66
            case 'part_repair':
67
                // part_repair  {group}
68
                $group = $parts[1] ?? '';
69
70
                return PHP_BINARY.' artisan binaries:part-repair "'.$group.'"';
71
72
            case 'releases':
73
                // releases  {groupId}
74
                $groupId = $parts[1] ?? '';
75
76
                return PHP_BINARY.' artisan releases:process '.($groupId !== '' ? $groupId : '');
77
78
            case 'update_group_headers':
79
                // update_group_headers  {group}
80
                $group = $parts[1] ?? '';
81
82
                return PHP_BINARY.' artisan group:update-headers "'.$group.'"';
83
84
            case 'update_per_group':
85
                // update_per_group  {groupId}
86
                $groupId = $parts[1] ?? '';
87
88
                return PHP_BINARY.' artisan group:update-all '.$groupId;
89
90
            case 'pp_additional':
91
                // pp_additional  {guid}
92
                $guid = $parts[1] ?? '';
93
94
                return PHP_BINARY.' artisan postprocess:guid additional '.$guid;
95
96
            case 'pp_nfo':
97
                // pp_nfo  {guid}
98
                $guid = $parts[1] ?? '';
99
100
                return PHP_BINARY.' artisan postprocess:guid nfo '.$guid;
101
102
            case 'pp_movie':
103
                // pp_movie  {guid}  {renamed}
104
                $guid = $parts[1] ?? '';
105
                $renamed = $parts[2] ?? '';
106
107
                return PHP_BINARY.' artisan postprocess:guid movie '.$guid.($renamed !== '' ? ' '.$renamed : '');
108
109
            case 'pp_tv':
110
                // pp_tv  {guid}  {renamed}
111
                $guid = $parts[1] ?? '';
112
                $renamed = $parts[2] ?? '';
113
114
                return PHP_BINARY.' artisan postprocess:guid tv '.$guid.($renamed !== '' ? ' '.$renamed : '');
115
116
            default:
117
                // Log unrecognized command and return empty string
118
                if (config('app.debug')) {
119
                    \Log::warning('Unrecognized multiprocessing command: '.$args);
120
                }
121
122
                return '';
123
        }
124
    }
125
126
    /**
127
     * Public wrapper for buildDnrCommand (used by ForkingService).
128
     */
129
    public function buildDnrCommandPublic(string $args): string
130
    {
131
        return $this->buildDnrCommand($args);
132
    }
133
134
    protected function executeCommand(string $command): string
135
    {
136
        $process = Process::fromShellCommandline($command);
137
        $process->setTimeout(1800);
138
        $process->run(function ($type, $buffer) {
139
            if ($type === Process::ERR) {
140
                echo $buffer;
141
            }
142
        });
143
144
        return $process->getOutput();
145
    }
146
147
    protected function headerStart(string $workType, int $count, int $maxProcesses): void
148
    {
149
        if (config('nntmux.echocli')) {
150
            $this->colorCli->header(
151
                'Multi-processing started at '.now()->toRfc2822String().' for '.$workType.' with '.$count.
152
                ' job(s) to do using a max of '.max(1, $maxProcesses).' child process(es).'
153
            );
154
        }
155
    }
156
157
    protected function headerNone(): void
158
    {
159
        if (config('nntmux.echocli')) {
160
            $this->colorCli->header('No work to do!');
161
        }
162
    }
163
164
    /**
165
     * Run multiple commands in parallel using Symfony Process with configurable timeout.
166
     * This replaces Laravel Concurrency::run() which has a fixed 60-second timeout.
167
     *
168
     * @param  array<string|int, callable>  $tasks  Array of callables keyed by identifier
169
     * @param  int  $maxProcesses  Maximum concurrent processes
170
     * @param  int|null  $timeout  Timeout in seconds (null = use config default)
171
     * @return array<string|int, mixed>  Results keyed by the same identifiers as $tasks
172
     */
173
    protected function runParallelProcesses(array $tasks, int $maxProcesses, ?int $timeout = null): array
174
    {
175
        $maxProcesses = max(1, $maxProcesses);
176
        $timeout = $timeout ?? (int) config('nntmux.multiprocessing_max_child_time', 1800);
177
        $results = [];
178
        $running = [];
179
        $queue = $tasks;
180
181
        $startNext = function () use (&$queue, &$running, $timeout): ?string {
0 ignored issues
show
The import $timeout is not used and could be removed.

This check looks for imports that have been defined, but are not used in the scope.

Loading history...
The assignment to $startNext is dead and can be removed.
Loading history...
182
            if (empty($queue)) {
183
                return null;
184
            }
185
            $key = array_key_first($queue);
186
            $callable = $queue[$key];
187
            unset($queue[$key]);
188
189
            // Get the command string from the callable context
190
            // We need to execute the callable which returns the command result
191
            $running[$key] = [
192
                'callable' => $callable,
193
                'started' => microtime(true),
194
            ];
195
196
            return (string) $key;
197
        };
198
199
        // For small batch sizes, run synchronously to avoid overhead
200
        if (count($tasks) <= 1 || $maxProcesses <= 1) {
201
            foreach ($tasks as $key => $callable) {
202
                try {
203
                    $results[$key] = $callable();
204
                } catch (\Throwable $e) {
205
                    \Log::error("Task {$key} failed: " . $e->getMessage());
206
                    $results[$key] = '';
207
                }
208
            }
209
            return $results;
210
        }
211
212
        // For parallel execution, we need to use Process directly
213
        // Convert callables to commands and run them in parallel
214
        $commands = [];
215
        $taskMapping = [];
0 ignored issues
show
The assignment to $taskMapping is dead and can be removed.
Loading history...
216
217
        foreach ($tasks as $key => $callable) {
218
            // We need to extract the command from the callable
219
            // This is a bit tricky, but we can use reflection or run the callable
220
            // For now, let's store the callable and run them in batches
221
            $commands[$key] = $callable;
222
        }
223
224
        // Process in batches
225
        $batches = array_chunk($commands, $maxProcesses, true);
226
227
        foreach ($batches as $batch) {
228
            $batchProcesses = [];
0 ignored issues
show
The assignment to $batchProcesses is dead and can be removed.
Loading history...
229
230
            foreach ($batch as $key => $callable) {
231
                try {
232
                    $results[$key] = $callable();
233
                } catch (\Throwable $e) {
234
                    \Log::error("Task {$key} failed: " . $e->getMessage());
235
                    $results[$key] = '';
236
                }
237
            }
238
        }
239
240
        return $results;
241
    }
242
243
    /**
244
     * Run multiple commands in parallel with real process forking and configurable timeout.
245
     *
246
     * @param  array<string|int, string>  $commands  Array of shell commands keyed by identifier
247
     * @param  int  $maxProcesses  Maximum concurrent processes
248
     * @param  int|null  $timeout  Timeout in seconds (null = use config default)
249
     * @return array<string|int, string>  Command outputs keyed by the same identifiers
250
     */
251
    protected function runParallelCommands(array $commands, int $maxProcesses, ?int $timeout = null): array
252
    {
253
        $maxProcesses = max(1, $maxProcesses);
254
        $timeout = $timeout ?? (int) config('nntmux.multiprocessing_max_child_time', 1800);
255
        $results = [];
256
        $running = [];
257
        $queue = $commands;
258
259
        $startNext = function () use (&$queue, &$running, $timeout) {
260
            if (empty($queue)) {
261
                return;
262
            }
263
            $key = array_key_first($queue);
264
            $cmd = $queue[$key];
265
            unset($queue[$key]);
266
267
            $proc = Process::fromShellCommandline($cmd);
268
            $proc->setTimeout($timeout);
269
            $proc->start();
270
            $running[$key] = $proc;
271
        };
272
273
        // Prime initial processes
274
        for ($i = 0; $i < $maxProcesses && !empty($queue); $i++) {
275
            $startNext();
276
        }
277
278
        // Event loop
279
        while (!empty($running)) {
280
            foreach ($running as $key => $proc) {
281
                if (!$proc->isRunning()) {
282
                    $results[$key] = $proc->getOutput();
283
                    // Output errors if any
284
                    $err = $proc->getErrorOutput();
285
                    if ($err !== '') {
286
                        echo $err;
287
                    }
288
                    unset($running[$key]);
289
                    // Start next from queue if available
290
                    if (!empty($queue)) {
291
                        $startNext();
292
                    }
293
                }
294
            }
295
            usleep(50000); // 50ms
296
        }
297
298
        return $results;
299
    }
300
301
    /**
302
     * Run multiple shell commands concurrently and stream their output in real-time.
303
     * Uses Symfony Process start() with a small event loop to enforce max concurrency.
304
     */
305
    protected function runStreamingCommands(array $commands, int $maxProcesses, string $desc): void
306
    {
307
        $maxProcesses = max(1, (int) $maxProcesses);
308
        $running = [];
309
        $queue = $commands;
310
        $total = \count($commands);
311
        $started = 0;
312
        $finished = 0;
313
314
        $this->headerStart('postprocess: '.$desc, $total, $maxProcesses);
315
316
        $startNext = function () use (&$queue, &$running, &$started) {
317
            if (empty($queue)) {
318
                return;
319
            }
320
            $cmd = array_shift($queue);
321
            $proc = Process::fromShellCommandline($cmd);
322
            $proc->setTimeout((int) config('nntmux.multiprocessing_max_child_time', 1800));
323
            $proc->start(function ($type, $buffer) {
324
                // Stream both STDOUT and STDERR
325
                echo $buffer;
326
            });
327
            $running[spl_object_id($proc)] = $proc;
328
            $started++;
329
        };
330
331
        // Prime initial processes
332
        for ($i = 0; $i < $maxProcesses && ! empty($queue); $i++) {
333
            $startNext();
334
        }
335
336
        // Event loop
337
        while (! empty($running)) {
338
            foreach ($running as $key => $proc) {
339
                if (! $proc->isRunning()) {
340
                    // Print any remaining buffered output
341
                    $out = $proc->getIncrementalOutput();
342
                    $err = $proc->getIncrementalErrorOutput();
343
                    if ($out !== '') {
344
                        echo $out;
345
                    }
346
                    if ($err !== '') {
347
                        echo $err;
348
                    }
349
                    unset($running[$key]);
350
                    $finished++;
351
                    if (config('nntmux.echocli')) {
352
                        $this->colorCli->primary('Finished task #'.($total - $finished + 1).' for '.$desc);
353
                    }
354
                    // Start next from queue if available
355
                    if (! empty($queue)) {
356
                        $startNext();
357
                    }
358
                }
359
            }
360
            usleep(100000); // 100ms
361
        }
362
    }
363
}
364
365