Issues (985)

app/Services/Runners/PostProcessRunner.php (1 issue)

Severity
1
<?php
2
3
namespace App\Services\Runners;
4
5
use App\Models\Settings;
6
use Illuminate\Support\Facades\Concurrency;
7
use Illuminate\Support\Facades\DB;
8
use Illuminate\Support\Facades\Log;
9
10
class PostProcessRunner extends BaseRunner
11
{
12
    private function runPostProcess(array $releases, int $maxProcesses, string $type, string $desc): void
13
    {
14
        if (empty($releases)) {
15
            $this->headerNone();
16
17
            return;
18
        }
19
20
        // If streaming is enabled, run commands with real-time output
21
        if ((bool) config('nntmux.stream_fork_output', false) === true) {
22
            $commands = [];
23
            foreach ($releases as $release) {
24
                // id may already be a single GUID bucket char; if not, take first char defensively
25
                $char = isset($release->id) ? substr((string) $release->id, 0, 1) : '';
26
                // Use postprocess:guid command which accepts the GUID character
27
                $commands[] = PHP_BINARY.' artisan postprocess:guid '.$type.' '.$char;
28
            }
29
            $this->runStreamingCommands($commands, $maxProcesses, $desc);
30
31
            return;
32
        }
33
34
        $count = count($releases);
35
        $this->headerStart('postprocess: '.$desc, $count, $maxProcesses);
36
37
        // Process in batches using Laravel's native Concurrency facade
38
        $batches = array_chunk($releases, max(1, $maxProcesses));
39
40
        foreach ($batches as $batchIndex => $batch) {
41
            $tasks = [];
42
            foreach ($batch as $idx => $release) {
43
                $char = isset($release->id) ? substr((string) $release->id, 0, 1) : '';
44
                // Use postprocess:guid command which accepts the GUID character
45
                $command = PHP_BINARY.' artisan postprocess:guid '.$type.' '.$char;
46
                $tasks[$idx] = fn () => $this->executeCommand($command);
47
            }
48
49
            try {
50
                $results = Concurrency::run($tasks);
51
52
                foreach ($results as $taskIdx => $output) {
53
                    echo $output;
54
                    $this->colorCli->primary('Finished task for '.$desc);
55
                }
56
            } catch (\Throwable $e) {
57
                Log::error('Postprocess batch failed: '.$e->getMessage());
58
                $this->colorCli->error('Batch '.($batchIndex + 1).' failed: '.$e->getMessage());
59
            }
60
        }
61
    }
62
63
    public function processAdditional(): void
64
    {
65
        $ppAddMinSize = Settings::settingValue('minsizetopostprocess') !== '' ? (int) Settings::settingValue('minsizetopostprocess') : 1;
66
        $ppAddMinSize = ($ppAddMinSize > 0 ? ('AND r.size > '.($ppAddMinSize * 1048576)) : '');
67
        $ppAddMaxSize = (Settings::settingValue('maxsizetopostprocess') !== '') ? (int) Settings::settingValue('maxsizetopostprocess') : 100;
68
        $ppAddMaxSize = ($ppAddMaxSize > 0 ? ('AND r.size < '.($ppAddMaxSize * 1073741824)) : '');
69
70
        $sql = '
71
            SELECT DISTINCT LEFT(r.leftguid, 1) AS id
72
            FROM releases r
73
            LEFT JOIN categories c ON c.id = r.categories_id
74
            WHERE r.passwordstatus = -1
75
            AND r.haspreview = -1
76
            AND c.disablepreview = 0
77
            '.$ppAddMaxSize.' '.$ppAddMinSize.'
78
            LIMIT 16';
79
        $queue = DB::select($sql);
80
81
        $maxProcesses = (int) Settings::settingValue('postthreads');
82
        $this->runPostProcess($queue, $maxProcesses, 'additional', 'additional postprocessing');
83
    }
84
85
    public function processNfo(): void
86
    {
87
        if ((int) Settings::settingValue('lookupnfo') !== 1) {
88
            $this->headerNone();
89
90
            return;
91
        }
92
93
        $nfoQuery = \Blacklight\Nfo::NfoQueryString();
94
95
        $checkSql = 'SELECT r.id FROM releases r WHERE 1=1 '.$nfoQuery.' LIMIT 1';
96
        if (count(DB::select($checkSql)) === 0) {
97
            $this->headerNone();
98
99
            return;
100
        }
101
102
        $sql = '
103
            SELECT DISTINCT LEFT(r.leftguid, 1) AS id
104
            FROM releases r
105
            WHERE 1=1 '.$nfoQuery.'
106
            LIMIT 16';
107
        $queue = DB::select($sql);
108
109
        $maxProcesses = (int) Settings::settingValue('nfothreads');
110
        $this->runPostProcess($queue, $maxProcesses, 'nfo', 'nfo postprocessing');
111
    }
112
113
    public function processMovies(bool $renamedOnly): void
114
    {
115
        if ((int) Settings::settingValue('lookupimdb') <= 0) {
116
            $this->headerNone();
117
118
            return;
119
        }
120
121
        $condLookup = ((int) Settings::settingValue('lookupimdb') === 2 ? 'AND isrenamed = 1' : '');
122
        $condRenamedOnly = ($renamedOnly ? 'AND isrenamed = 1' : '');
123
124
        $checkSql = '
125
            SELECT id
126
            FROM releases
127
            WHERE categories_id BETWEEN 2000 AND 2999
128
            AND imdbid IS NULL
129
            '.$condLookup.' '.$condRenamedOnly.'
130
            LIMIT 1';
131
        if (count(DB::select($checkSql)) === 0) {
132
            $this->headerNone();
133
134
            return;
135
        }
136
137
        $renamedFlag = ($renamedOnly ? 2 : 1);
138
        $sql = '
139
            SELECT DISTINCT LEFT(leftguid, 1) AS id, '.$renamedFlag.' AS renamed
140
            FROM releases
141
            WHERE categories_id BETWEEN 2000 AND 2999
142
            AND imdbid IS NULL
143
            '.$condLookup.' '.$condRenamedOnly.'
144
            LIMIT 16';
145
        $queue = DB::select($sql);
146
147
        $maxProcesses = (int) Settings::settingValue('postthreadsnon');
148
        $this->runPostProcess($queue, $maxProcesses, 'movie', 'movies postprocessing');
149
    }
150
151
    public function processTv(bool $renamedOnly): void
152
    {
153
        if ((int) Settings::settingValue('lookuptv') <= 0) {
154
            $this->headerNone();
155
156
            return;
157
        }
158
159
        $condLookup = ((int) Settings::settingValue('lookuptv') === 2 ? 'AND isrenamed = 1' : '');
160
        $condRenamedOnly = ($renamedOnly ? 'AND isrenamed = 1' : '');
161
162
        $checkSql = '
163
            SELECT id
164
            FROM releases
165
            WHERE categories_id BETWEEN 5000 AND 5999
166
            AND categories_id != 5070
167
            AND videos_id = 0
168
            AND size > 1048576
169
            AND tv_episodes_id BETWEEN -3 AND 0
170
            '.$condLookup.' '.$condRenamedOnly.'
171
            LIMIT 1';
172
        if (count(DB::select($checkSql)) === 0) {
173
            $this->headerNone();
174
175
            return;
176
        }
177
178
        $renamedFlag = ($renamedOnly ? 2 : 1);
179
        $sql = '
180
            SELECT DISTINCT LEFT(leftguid, 1) AS id, '.$renamedFlag.' AS renamed
181
            FROM releases
182
            WHERE categories_id BETWEEN 5000 AND 5999
183
            AND categories_id != 5070
184
            AND videos_id = 0
185
            AND tv_episodes_id BETWEEN -3 AND 0
186
            AND size > 1048576
187
            '.$condLookup.' '.$condRenamedOnly.'
188
            LIMIT 16';
189
        $queue = DB::select($sql);
190
191
        $maxProcesses = (int) Settings::settingValue('postthreadsnon');
192
193
        // Use pipelined TV processing for better efficiency
194
        $this->runPostProcessTvPipeline($queue, $maxProcesses, 'tv postprocessing (pipelined)', $renamedOnly);
195
    }
196
197
    /**
198
     * Run pipelined TV post-processing across multiple GUID buckets in parallel.
199
     * Each parallel process runs the full provider pipeline sequentially.
200
     */
201
    private function runPostProcessTvPipeline(array $releases, int $maxProcesses, string $desc, bool $renamedOnly): void
0 ignored issues
show
The parameter $renamedOnly is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

201
    private function runPostProcessTvPipeline(array $releases, int $maxProcesses, string $desc, /** @scrutinizer ignore-unused */ bool $renamedOnly): void

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
202
    {
203
        if (empty($releases)) {
204
            $this->headerNone();
205
206
            return;
207
        }
208
209
        // If streaming is enabled, run commands with real-time output
210
        if ((bool) config('nntmux.stream_fork_output', false) === true) {
211
            $commands = [];
212
            foreach ($releases as $release) {
213
                $char = isset($release->id) ? substr((string) $release->id, 0, 1) : '';
214
                $renamed = isset($release->renamed) ? $release->renamed : '';
215
                // Use the pipelined TV command
216
                $commands[] = PHP_BINARY.' artisan postprocess:tv-pipeline '.$char.($renamed ? ' '.$renamed : '').' --mode=pipeline';
217
            }
218
            $this->runStreamingCommands($commands, $maxProcesses, $desc);
219
220
            return;
221
        }
222
223
        $count = count($releases);
224
        $this->headerStart('postprocess: '.$desc, $count, $maxProcesses);
225
226
        // Process in batches using Laravel's native Concurrency facade
227
        $batches = array_chunk($releases, max(1, $maxProcesses));
228
229
        foreach ($batches as $batchIndex => $batch) {
230
            $tasks = [];
231
            foreach ($batch as $idx => $release) {
232
                $char = isset($release->id) ? substr((string) $release->id, 0, 1) : '';
233
                $renamed = isset($release->renamed) ? $release->renamed : '';
234
                // Use the pipelined TV command for each GUID bucket
235
                $command = PHP_BINARY.' artisan postprocess:tv-pipeline '.$char.($renamed ? ' '.$renamed : '').' --mode=pipeline';
236
                $tasks[$idx] = fn () => $this->executeCommand($command);
237
            }
238
239
            try {
240
                $results = Concurrency::run($tasks);
241
242
                foreach ($results as $taskIdx => $output) {
243
                    echo $output;
244
                    $this->colorCli->primary('Finished task for '.$desc);
245
                }
246
            } catch (\Throwable $e) {
247
                Log::error('TV pipeline batch failed: '.$e->getMessage());
248
                $this->colorCli->error('Batch '.($batchIndex + 1).' failed: '.$e->getMessage());
249
            }
250
        }
251
    }
252
253
    /**
254
     * Lightweight check to determine if there is any TV work to process.
255
     */
256
    public function hasTvWork(bool $renamedOnly): bool
257
    {
258
        if ((int) Settings::settingValue('lookuptv') <= 0) {
259
            return false;
260
        }
261
262
        $condLookup = ((int) Settings::settingValue('lookuptv') === 2 ? 'AND isrenamed = 1' : '');
263
        $condRenamedOnly = ($renamedOnly ? 'AND isrenamed = 1' : '');
264
265
        $checkSql = '
266
            SELECT id
267
            FROM releases
268
            WHERE categories_id BETWEEN 5000 AND 5999
269
            AND categories_id != 5070
270
            AND videos_id = 0
271
            AND size > 1048576
272
            AND tv_episodes_id BETWEEN -3 AND 0
273
            '.$condLookup.' '.$condRenamedOnly.'
274
            LIMIT 1';
275
276
        return count(DB::select($checkSql)) > 0;
277
    }
278
279
    public function processAnime(): void
280
    {
281
        if ((int) Settings::settingValue('lookupanidb') <= 0) {
282
            $this->headerNone();
283
284
            return;
285
        }
286
287
        $checkSql = '
288
            SELECT id
289
            FROM releases
290
            WHERE categories_id = 5070
291
            AND anidbid IS NULL
292
            LIMIT 1';
293
        if (count(DB::select($checkSql)) === 0) {
294
            $this->headerNone();
295
296
            return;
297
        }
298
299
        $sql = '
300
            SELECT DISTINCT LEFT(leftguid, 1) AS id
301
            FROM releases
302
            WHERE categories_id = 5070
303
            AND anidbid IS NULL
304
            LIMIT 16';
305
        $queue = DB::select($sql);
306
307
        $maxProcesses = (int) Settings::settingValue('postthreadsnon');
308
        $this->runPostProcess($queue, $maxProcesses, 'anime', 'anime postprocessing');
309
    }
310
311
    public function processBooks(): void
312
    {
313
        if ((int) Settings::settingValue('lookupbooks') <= 0) {
314
            $this->headerNone();
315
316
            return;
317
        }
318
319
        $checkSql = '
320
            SELECT id
321
            FROM releases
322
            WHERE categories_id BETWEEN 7000 AND 7999
323
            AND bookinfo_id IS NULL
324
            LIMIT 1';
325
        if (count(DB::select($checkSql)) === 0) {
326
            $this->headerNone();
327
328
            return;
329
        }
330
331
        $sql = '
332
            SELECT DISTINCT LEFT(leftguid, 1) AS id
333
            FROM releases
334
            WHERE categories_id BETWEEN 7000 AND 7999
335
            AND bookinfo_id IS NULL
336
            LIMIT 16';
337
        $queue = DB::select($sql);
338
339
        $maxProcesses = (int) Settings::settingValue('postthreadsnon');
340
        $this->runPostProcess($queue, $maxProcesses, 'books', 'books postprocessing');
341
    }
342
}
343
344