Passed
Pull Request — master (#1628)
by Darko
10:58 queued 04:02
created

Forking::releases()   B

Complexity

Conditions 6
Paths 14

Size

Total Lines 41
Code Lines 26

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 26
dl 0
loc 41
rs 8.8817
c 0
b 0
f 0
cc 6
nc 14
nop 0
1
<?php
2
3
namespace Blacklight\libraries;
4
5
use App\Models\Settings;
6
use App\Models\UsenetGroup;
7
use Blacklight\ColorCLI;
8
use Blacklight\Nfo;
9
use Blacklight\NZB;
10
use Blacklight\processing\PostProcess;
11
use Illuminate\Support\Carbon;
12
use Illuminate\Support\Facades\DB;
13
use Illuminate\Support\Facades\Log;
14
use Opis\Closure\SerializableClosure;
15
use Spatie\Async\Output\SerializableException;
16
use Spatie\Async\Pool;
17
use Symfony\Component\Process\Process;
18
19
/**
20
 * Class Forking.
21
 *
22
 * This forks various scripts.
23
 *
24
 * For example, you get all the ID's of the active groups in the groups table, you then iterate over them and spawn
25
 * processes of misc/update_binaries.php passing the group ID's.
26
 */
27
class Forking
28
{
29
    public ColorCLI $colorCli;
30
31
    /**
32
     * @var int The type of output
33
     */
34
    protected int $outputType;
35
36
    /**
37
     * Path to do not run folder.
38
     */
39
    private string $dnr_path;
40
41
    /**
42
     * Work to work on.
43
     */
44
    private array $work = [];
45
46
    /**
47
     * How much work do we have to do?
48
     */
49
    public int $_workCount = 0;
50
51
    /**
52
     * The type of work we want to work on.
53
     */
54
    private string $workType = '';
55
56
    /**
57
     * List of passed in options for the current work type.
58
     */
59
    private array $workTypeOptions = [];
60
61
    /**
62
     * Max amount of child processes to do work at a time.
63
     */
64
    private int $maxProcesses = 1;
65
66
    /**
67
     * Group used for safe backfill.
68
     */
69
    private string $safeBackfillGroup = '';
70
71
    protected int $maxSize;
72
73
    protected int $minSize;
74
75
    protected int $maxRetries;
76
77
    protected int $dummy;
78
79
    private bool $processAdditional = false; // Should we process additional?
80
81
    private bool $processNFO = false; // Should we process NFOs?
82
83
    private bool $processMovies = false; // Should we process Movies?
84
85
    private bool $processTV = false; // Should we process TV?
86
87
    /**
88
     * Setup required parent / self vars.
89
     *
90
     * @throws \Exception
91
     */
92
    public function __construct()
93
    {
94
        SerializableClosure::removeSecurityProvider();
95
        $this->colorCli = new ColorCLI;
96
97
        $this->dnr_path = PHP_BINARY.' misc/update/multiprocessing/.do_not_run/switch.php "php  ';
98
99
        $this->maxSize = (int) Settings::settingValue('maxsizetoprocessnfo');
100
        $this->minSize = (int) Settings::settingValue('minsizetoprocessnfo');
101
        $this->maxRetries = (int) Settings::settingValue('maxnforetries') >= 0 ? -((int) Settings::settingValue('maxnforetries') + 1) : Nfo::NFO_UNPROC;
102
        $this->maxRetries = max($this->maxRetries, -8);
103
    }
104
105
    /**
106
     * Setup the class to work on a type of work, then process the work.
107
     * Valid work types:.
108
     *
109
     * @param  string  $type  The type of multiProcessing to do : backfill, binaries, releases, postprocess
110
     * @param  array  $options  Array containing arguments for the type of work.
111
     *
112
     * @throws \Exception
113
     */
114
    public function processWorkType(string $type, array $options = []): void
115
    {
116
        // Set/reset some variables.
117
        $startTime = now()->timestamp;
118
        $this->workType = $type;
119
        $this->workTypeOptions = $options;
120
        $this->processAdditional = $this->processNFO = $this->processTV = $this->processMovies = $this->ppRenamedOnly = false;
121
        $this->work = [];
122
123
        // Process extra work that should not be forked and done before forking.
124
        $this->processStartWork();
125
126
        // Get work to fork.
127
        $this->getWork();
128
129
        // Process extra work that should not be forked and done after.
130
        $this->processEndWork();
131
132
        if (config('nntmux.echocli')) {
133
            $this->colorCli->header(
134
                'Multi-processing for '.$this->workType.' finished in '.(now()->timestamp - $startTime).
135
                ' seconds at '.now()->toRfc2822String().'.'.PHP_EOL
136
            );
137
        }
138
    }
139
140
    /**
141
     * Only post process renamed movie / tv releases?
142
     */
143
    private bool $ppRenamedOnly;
144
145
    /**
146
     * Get work for our workers to work on, set the max child processes here.
147
     *
148
     * @throws \Exception
149
     */
150
    private function getWork(): void
151
    {
152
        $this->maxProcesses = 0;
153
154
        switch ($this->workType) {
155
            case 'backfill':
156
                $this->backfill();
157
                break;
158
159
            case 'binaries':
160
                $this->binaries();
161
                break;
162
163
            case 'fixRelNames_standard':
164
            case 'fixRelNames_predbft':
165
                $this->fixRelNames();
166
                break;
167
168
            case 'releases':
169
                $this->releases();
170
                break;
171
172
            case 'postProcess_ama':
173
                $this->processSingle();
174
                break;
175
176
            case 'postProcess_add':
177
                $this->postProcessAdd();
178
                break;
179
180
            case 'postProcess_mov':
181
                $this->ppRenamedOnly = (isset($this->workTypeOptions[0]) && $this->workTypeOptions[0] === true);
182
                $this->postProcessMov();
183
                break;
184
185
            case 'postProcess_nfo':
186
                $this->postProcessNfo();
187
                break;
188
189
            case 'postProcess_tv':
190
                $this->ppRenamedOnly = (isset($this->workTypeOptions[0]) && $this->workTypeOptions[0] === true);
191
                $this->postProcessTv();
192
                break;
193
194
            case 'safe_backfill':
195
                $this->safeBackfill();
196
                break;
197
198
            case 'safe_binaries':
199
                $this->safeBinaries();
200
                break;
201
202
            case 'update_per_group':
203
                $this->updatePerGroup();
204
                break;
205
        }
206
    }
207
208
    /**
209
     * Process work if we have any.
210
     */
211
    private function processWork(): void
212
    {
213
        $this->_workCount = \count($this->work);
214
        if ($this->_workCount > 0 && config('nntmux.echocli') === true) {
215
            $this->colorCli->header(
216
                'Multi-processing started at '.now()->toRfc2822String().' for '.$this->workType.' with '.$this->_workCount.
217
                ' job(s) to do using a max of '.$this->maxProcesses.' child process(es).'
218
            );
219
        }
220
        if (empty($this->_workCount) && config('nntmux.echocli') === true) {
221
            $this->colorCli->header('No work to do!');
222
        }
223
    }
224
225
    /**
226
     * Process any work that does not need to be forked, but needs to run at the start.
227
     */
228
    private function processStartWork(): void
229
    {
230
        switch ($this->workType) {
231
            case 'safe_backfill':
232
            case 'safe_binaries':
233
                $this->_executeCommand(PHP_BINARY.' misc/update/tmux/bin/update_groups.php');
234
                break;
235
        }
236
    }
237
238
    /**
239
     * Process any work that does not need to be forked, but needs to run at the end.
240
     */
241
    private function processEndWork(): void
242
    {
243
        switch ($this->workType) {
244
            case 'update_per_group':
245
            case 'releases':
246
247
                $this->_executeCommand($this->dnr_path.'releases  '.\count($this->work).'_"');
248
249
                break;
250
        }
251
    }
252
253
    ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
254
    //////////////////////////////////////// All backFill code here ////////////////////////////////////////////////////
255
    ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
256
257
    private function backfill(): void
258
    {
259
        // The option for backFill is for doing up to x articles. Else it's done by date.
260
        $this->work = DB::select(
261
            sprintf(
262
                'SELECT name %s FROM usenet_groups WHERE backfill = 1',
263
                ($this->workTypeOptions[0] === false ? '' : (', '.$this->workTypeOptions[0].' AS max'))
264
            )
265
        );
266
267
        $pool = Pool::create()->concurrency($this->maxProcesses)->timeout(config('nntmux.multiprocessing_max_child_time'));
268
        $this->processWork();
269
        $maxWork = \count($this->work);
270
        foreach ($this->work as $group) {
271
            $pool->add(function () use ($group) {
272
                return $this->_executeCommand(PHP_BINARY.' misc/update/backfill.php '.$group->name.(isset($group->max) ? (' '.$group->max) : ''));
273
            }, 2000000)->then(function ($output) use ($group, $maxWork) {
274
                echo $output;
275
                $this->colorCli->primary('Task #'.$maxWork.' Backfilled group '.$group->name);
276
            })->catch(function (\Throwable $exception) {
277
                echo $exception->getMessage();
278
            })->catch(static function (SerializableException $serializableException) {
0 ignored issues
show
Unused Code introduced by
The parameter $serializableException is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

278
            })->catch(static function (/** @scrutinizer ignore-unused */ SerializableException $serializableException) {

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
279
                //we do nothing here just catch the error and move on
280
            });
281
            $maxWork--;
282
        }
283
        $pool->wait();
284
    }
285
286
    private function safeBackfill(): void
287
    {
288
        $backfill_qty = (int) Settings::settingValue('backfill_qty');
289
        $backfill_order = (int) Settings::settingValue('backfill_order');
290
        $backfill_days = (int) Settings::settingValue('backfill_days');
291
        $maxMessages = (int) Settings::settingValue('maxmssgs');
292
        $threads = (int) Settings::settingValue('backfillthreads');
293
294
        $orderby = 'ORDER BY a.last_record ASC';
295
        switch ($backfill_order) {
296
            case 1:
297
                $orderby = 'ORDER BY first_record_postdate DESC';
298
                break;
299
300
            case 2:
301
                $orderby = 'ORDER BY first_record_postdate ASC';
302
                break;
303
304
            case 3:
305
                $orderby = 'ORDER BY name ASC';
306
                break;
307
308
            case 4:
309
                $orderby = 'ORDER BY name DESC';
310
                break;
311
312
            case 5:
313
                $orderby = 'ORDER BY a.last_record DESC';
314
                break;
315
        }
316
317
        $backfilldays = '';
318
        if ($backfill_days === 1) {
319
            $backfilldays = 'g.backfill_target';
320
        } elseif ($backfill_days === 2) {
321
            $backfilldays = now()->diffInDays(Carbon::createFromFormat('Y-m-d', Settings::settingValue('safebackfilldate')), true);
322
        }
323
324
        $data = DB::select(
325
            sprintf(
326
                'SELECT g.name,
327
				g.first_record AS our_first,
328
				MAX(a.first_record) AS their_first,
329
				MAX(a.last_record) AS their_last
330
				FROM usenet_groups g
331
				INNER JOIN short_groups a ON g.name = a.name
332
				WHERE g.first_record IS NOT NULL
333
				AND g.first_record_postdate IS NOT NULL
334
				AND g.backfill = 1
335
				AND (NOW() - INTERVAL %s DAY ) < g.first_record_postdate
336
				GROUP BY a.name, a.last_record, g.name, g.first_record
337
				%s LIMIT 1',
338
                $backfilldays,
339
                $orderby
340
            )
341
        );
342
343
        $count = 0;
344
        if (! empty($data) && isset($data[0]->name)) {
345
            $this->safeBackfillGroup = $data[0]->name;
346
347
            $count = ($data[0]->our_first - $data[0]->their_first);
348
        }
349
350
        if ($count > 0) {
351
            if ($count > ($backfill_qty * $threads)) {
352
                $getEach = ceil(($backfill_qty * $threads) / $maxMessages);
353
            } else {
354
                $getEach = $count / $maxMessages;
355
            }
356
357
            $queues = [];
358
            for ($i = 0; $i <= $getEach - 1; $i++) {
359
                $queues[$i] = sprintf('get_range  backfill  %s  %s  %s  %s', $this->safeBackfillGroup, $data[0]->our_first - $i * $maxMessages - $maxMessages, $data[0]->our_first - $i * $maxMessages - 1, $i + 1);
360
            }
361
362
            $pool = Pool::create()->concurrency($threads)->timeout(config('nntmux.multiprocessing_max_child_time'));
363
364
            $this->processWork();
365
            foreach ($queues as $queue) {
366
                $pool->add(function () use ($queue) {
367
                    return $this->_executeCommand($this->dnr_path.$queue.'"');
368
                }, 2000000)->then(function ($output) {
369
                    echo $output;
370
                    $this->colorCli->primary('Backfilled group '.$this->safeBackfillGroup);
371
                })->catch(function (\Throwable $exception) {
372
                    echo $exception->getMessage();
373
                })->catch(static function (SerializableException $serializableException) {
0 ignored issues
show
Unused Code introduced by
The parameter $serializableException is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

373
                })->catch(static function (/** @scrutinizer ignore-unused */ SerializableException $serializableException) {

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
374
                    //we do nothing here just catch the error and move on
375
                });
376
            }
377
            $pool->wait();
378
        } else {
379
            if (config('nntmux.echocli')) {
380
                $this->colorCli->primary('No backfill needed for group '.$this->safeBackfillGroup);
381
            }
382
        }
383
    }
384
385
    ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
386
    //////////////////////////////////////// All binaries code here ////////////////////////////////////////////////////
387
    ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
388
389
    private function binaries(): void
390
    {
391
        $this->work = DB::select(
392
            sprintf(
393
                'SELECT name, %d AS max FROM usenet_groups WHERE active = 1',
394
                $this->workTypeOptions[0]
395
            )
396
        );
397
398
        $this->maxProcesses = (int) Settings::settingValue('binarythreads');
399
400
        $pool = Pool::create()->concurrency($this->maxProcesses)->timeout(config('nntmux.multiprocessing_max_child_time'));
401
402
        $maxWork = \count($this->work);
403
404
        $this->processWork();
405
        foreach ($this->work as $group) {
406
            $pool->add(function () use ($group) {
407
                return $this->_executeCommand(PHP_BINARY.' misc/update/update_binaries.php '.$group->name.' '.$group->max);
408
            }, 2000000)->then(function ($output) use ($group, $maxWork) {
409
                echo $output;
410
                $this->colorCli->primary('Task #'.$maxWork.' Updated group '.$group->name);
411
            })->catch(function (\Throwable $exception) {
412
                echo $exception->getMessage();
413
            })->catch(static function (SerializableException $serializableException) {
0 ignored issues
show
Unused Code introduced by
The parameter $serializableException is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

413
            })->catch(static function (/** @scrutinizer ignore-unused */ SerializableException $serializableException) {

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
414
                //we do nothing here just catch the error and move on
415
            });
416
            $maxWork--;
417
        }
418
419
        $pool->wait();
420
    }
421
422
    /**
423
     * @throws \Exception
424
     */
425
    private function safeBinaries(): void
426
    {
427
        $maxHeaders = (int) Settings::settingValue('max_headers_iteration') ?: 1000000;
428
        $maxMessages = (int) Settings::settingValue('maxmssgs');
429
        $this->maxProcesses = (int) Settings::settingValue('binarythreads');
430
431
        $this->work = DB::select(
432
            '
433
			SELECT g.name AS groupname, g.last_record AS our_last,
434
				a.last_record AS their_last
435
			FROM usenet_groups g
436
			INNER JOIN short_groups a ON g.active = 1 AND g.name = a.name
437
			ORDER BY a.last_record DESC'
438
        );
439
440
        if (! empty($this->work)) {
441
            $i = 1;
442
            $queues = [];
443
            foreach ($this->work as $group) {
444
                if ((int) $group->our_last === 0) {
445
                    $queues[$i] = sprintf('update_group_headers  %s', $group->groupname);
446
                    $i++;
447
                } else {
448
                    //only process if more than 20k headers available and skip the first 20k
449
                    $count = $group->their_last - $group->our_last - 20000;
450
                    if ($count <= $maxMessages * 2) {
451
                        $queues[$i] = sprintf('update_group_headers  %s', $group->groupname);
452
                        $i++;
453
                    } else {
454
                        $queues[$i] = sprintf('part_repair  %s', $group->groupname);
455
                        $i++;
456
                        $getEach = floor(min($count, $maxHeaders) / $maxMessages);
457
                        $remaining = min($count, $maxHeaders) - $getEach * $maxMessages;
458
                        for ($j = 0; $j < $getEach; $j++) {
459
                            $queues[$i] = sprintf('get_range  binaries  %s  %s  %s  %s', $group->groupname, $group->our_last + $j * $maxMessages + 1, $group->our_last + $j * $maxMessages + $maxMessages, $i);
460
                            $i++;
461
                        }
462
                        //add remainder to queue
463
                        $queues[$i] = sprintf('get_range  binaries  %s  %s  %s  %s', $group->groupname, $group->our_last + ($j + 1) * $maxMessages + 1, $group->our_last + ($j + 1) * $maxMessages + $remaining + 1, $i);
464
                        $i++;
465
                    }
466
                }
467
            }
468
            $pool = Pool::create()->concurrency($this->maxProcesses)->timeout(config('nntmux.multiprocessing_max_child_time'));
469
470
            $this->processWork();
471
            foreach ($queues as $queue) {
472
                preg_match('/alt\..+/i', $queue, $hit);
473
                $pool->add(function () use ($queue) {
474
                    return $this->_executeCommand($this->dnr_path.$queue.'"');
475
                }, 2000000)->then(function ($output) use ($hit) {
476
                    if (! empty($hit)) {
477
                        echo $output;
478
                        $this->colorCli->primary('Updated group '.$hit[0]);
479
                    }
480
                })->catch(function (\Throwable $exception) {
481
                    echo $exception->getMessage();
482
                })->catch(static function (SerializableException $serializableException) {
0 ignored issues
show
Unused Code introduced by
The parameter $serializableException is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

482
                })->catch(static function (/** @scrutinizer ignore-unused */ SerializableException $serializableException) {

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
483
                    //we do nothing here just catch the error and move on
484
                });
485
            }
486
487
            $pool->wait();
488
        }
489
    }
490
491
    ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
492
    //////////////////////////////////// All fix release names code here ///////////////////////////////////////////////
493
    ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
494
495
    private function fixRelNames(): void
496
    {
497
        $this->maxProcesses = (int) Settings::settingValue('fixnamethreads');
498
        $maxPerRun = (int) Settings::settingValue('fixnamesperrun');
499
500
        if ($this->maxProcesses > 16) {
501
            $this->maxProcesses = 16;
502
        } elseif ($this->maxProcesses === 0) {
503
            $this->maxProcesses = 1;
504
        }
505
506
        $leftGuids = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'];
507
508
        // Prevent PreDB FT from always running
509
        if ($this->workTypeOptions[0] === 'predbft') {
510
            $preCount = DB::select(
511
                sprintf(
512
                    "
513
					SELECT COUNT(p.id) AS num
514
					FROM predb p
515
					WHERE LENGTH(p.title) >= 15
516
					AND p.title NOT REGEXP '[\"\<\> ]'
517
					AND p.searched = 0
518
					AND p.predate < (NOW() - INTERVAL 1 DAY)"
519
                )
520
            );
521
            if ($preCount[0]->num > 0) {
522
                $leftGuids = \array_slice($leftGuids, 0, (int) ceil($preCount[0]->num / $maxPerRun));
523
            } else {
524
                $leftGuids = [];
525
            }
526
        }
527
528
        $count = 0;
529
        $queues = [];
530
        foreach ($leftGuids as $leftGuid) {
531
            $count++;
532
            if ($maxPerRun > 0) {
533
                $queues[$count] = sprintf('%s %s %s %s', $this->workTypeOptions[0], $leftGuid, $maxPerRun, $count);
534
            }
535
        }
536
537
        $this->work = $queues;
538
539
        $pool = Pool::create()->concurrency($this->maxProcesses)->timeout(config('nntmux.multiprocessing_max_child_time'));
540
541
        $maxWork = \count($queues);
542
543
        $this->processWork();
544
        foreach ($this->work as $queue) {
545
            $pool->add(function () use ($queue) {
546
                return $this->_executeCommand(PHP_BINARY.' misc/update/tmux/bin/groupfixrelnames.php "'.$queue.'"'.' true');
547
            }, 2000000)->then(function ($output) use ($maxWork) {
548
                echo $output;
549
                $this->colorCli->primary('Task #'.$maxWork.' Finished fixing releases names');
550
            })->catch(function (\Throwable $exception) {
551
                echo $exception->getMessage();
552
            })->catch(static function (SerializableException $serializableException) {
0 ignored issues
show
Unused Code introduced by
The parameter $serializableException is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

552
            })->catch(static function (/** @scrutinizer ignore-unused */ SerializableException $serializableException) {

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
553
                //we do nothing here just catch the error and move on
554
            });
555
            $maxWork--;
556
        }
557
        $pool->wait();
558
    }
559
560
    ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
561
    //////////////////////////////////////// All releases code here ////////////////////////////////////////////////////
562
    ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
563
564
    private function releases(): void
565
    {
566
        $work = DB::select('SELECT id, name FROM usenet_groups WHERE (active = 1 OR backfill = 1)');
567
        $this->maxProcesses = (int) Settings::settingValue('releasethreads');
568
569
        $uGroups = [];
570
        foreach ($work as $group) {
571
            try {
572
                $query = DB::select(sprintf('SELECT id FROM collections WHERE groups_id = %d LIMIT 1', $group->id));
573
                if (! empty($query)) {
574
                    $uGroups[] = ['id' => $group->id, 'name' => $group->name];
575
                }
576
            } catch (\PDOException $e) {
577
                if (config('app.debug') === true) {
578
                    Log::debug($e->getMessage());
579
                }
580
            }
581
        }
582
583
        $maxWork = \count($uGroups);
584
585
        $this->work = $uGroups;
586
587
        $pool = Pool::create()->concurrency($this->maxProcesses)->timeout(config('nntmux.multiprocessing_max_child_time'));
588
589
        $this->processWork();
590
        foreach ($uGroups as $group) {
591
            $pool->add(function () use ($group) {
592
                return $this->_executeCommand($this->dnr_path.'releases  '.$group['id'].'"');
593
            }, 2000000)->then(function ($output) use ($maxWork) {
594
                echo $output;
595
                $this->colorCli->primary('Task #'.$maxWork.' Finished performing release processing');
596
            })->catch(function (\Throwable $exception) {
597
                echo $exception->getMessage();
598
            })->catch(static function (SerializableException $serializableException) {
0 ignored issues
show
Unused Code introduced by
The parameter $serializableException is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

598
            })->catch(static function (/** @scrutinizer ignore-unused */ SerializableException $serializableException) {

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
599
                //we do nothing here just catch the error and move on
600
            });
601
            $maxWork--;
602
        }
603
604
        $pool->wait();
605
    }
606
607
    ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
608
    /////////////////////////////////////// All post process code here /////////////////////////////////////////////////
609
    ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
610
611
    /**
612
     * Only 1 exit method is used for post process, since they are all similar.
613
     */
614
    public function postProcess(array $releases, int $maxProcess): void
615
    {
616
        $type = $desc = '';
617
        if ($this->processAdditional) {
618
            $type = 'additional true ';
619
            $desc = 'additional postprocessing';
620
        } elseif ($this->processNFO) {
621
            $type = 'nfo true ';
622
            $desc = 'nfo postprocessing';
623
        } elseif ($this->processMovies) {
624
            $type = 'movies true ';
625
            $desc = 'movies postprocessing';
626
        } elseif ($this->processTV) {
627
            $type = 'tv true ';
628
            $desc = 'tv postprocessing';
629
        }
630
        $pool = Pool::create()->concurrency($maxProcess)->timeout(config('nntmux.multiprocessing_max_child_time'));
631
        $count = \count($releases);
632
        $this->processWork();
633
        foreach ($releases as $release) {
634
            if ($type !== '') {
635
                $pool->add(function () use ($release, $type) {
636
                    return $this->_executeCommand(PHP_BINARY.' misc/update/postprocess.php '.$type.$release->id);
637
                }, 2000000)->then(function ($output) use ($desc, $count) {
638
                    echo $output;
639
                    $this->colorCli->primary('Finished task #'.$count.' for '.$desc);
640
                })->catch(function (\Throwable $exception) {
641
                    echo $exception->getMessage();
642
                })->catch(static function (SerializableException $serializableException) {
0 ignored issues
show
Unused Code introduced by
The parameter $serializableException is not used and could be removed. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-unused  annotation

642
                })->catch(static function (/** @scrutinizer ignore-unused */ SerializableException $serializableException) {

This check looks for parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
643
                    //we do nothing here just catch the error and move on
644
                })->timeout(function () use ($count) {
645
                    $this->colorCli->notice('Task #'.$count.': Timeout occurred.');
646
                });
647
                $count--;
648
            }
649
        }
650
        $pool->wait();
651
    }
652
653
    /**
654
     * @throws \Exception
655
     */
656
    private function postProcessAdd(): void
657
    {
658
        $ppAddMinSize = Settings::settingValue('minsizetopostprocess') !== '' ? (int) Settings::settingValue('minsizetopostprocess') : 1;
659
        $ppAddMinSize = ($ppAddMinSize > 0 ? ('AND r.size > '.($ppAddMinSize * 1048576)) : '');
660
        $ppAddMaxSize = (Settings::settingValue('maxsizetopostprocess') !== '') ? (int) Settings::settingValue('maxsizetopostprocess') : 100;
661
        $ppAddMaxSize = ($ppAddMaxSize > 0 ? ('AND r.size < '.($ppAddMaxSize * 1073741824)) : '');
662
        $this->maxProcesses = 1;
663
        $ppQueue = DB::select(
664
            sprintf(
665
                '
666
					SELECT r.leftguid AS id
667
					FROM releases r
668
					LEFT JOIN categories c ON c.id = r.categories_id
669
					WHERE r.nzbstatus = %d
670
					AND r.passwordstatus = -1
671
					AND r.haspreview = -1
672
					AND c.disablepreview = 0
673
					%s %s
674
					GROUP BY r.leftguid
675
					LIMIT 16',
676
                NZB::NZB_ADDED,
677
                $ppAddMaxSize,
678
                $ppAddMinSize
679
            )
680
        );
681
        if (\count($ppQueue) > 0) {
682
            $this->processAdditional = true;
683
            $this->work = $ppQueue;
684
            $this->maxProcesses = (int) Settings::settingValue('postthreads');
685
        }
686
687
        $this->postProcess($this->work, $this->maxProcesses);
688
    }
689
690
    private string $nfoQueryString = '';
691
692
    /**
693
     * Check if we should process NFO's.
694
     *
695
     *
696
     * @throws \Exception
697
     */
698
    private function checkProcessNfo(): bool
699
    {
700
        if ((int) Settings::settingValue('lookupnfo') === 1) {
701
            $this->nfoQueryString = Nfo::NfoQueryString();
702
703
            return DB::select(sprintf('SELECT r.id FROM releases r WHERE 1=1 %s LIMIT 1', $this->nfoQueryString)) > 0;
704
        }
705
706
        return false;
707
    }
708
709
    /**
710
     * @throws \Exception
711
     */
712
    private function postProcessNfo(): void
713
    {
714
        $this->maxProcesses = 1;
715
        if ($this->checkProcessNfo()) {
716
            $this->processNFO = true;
717
            $this->work = DB::select(
718
                sprintf(
719
                    '
720
					SELECT r.leftguid AS id
721
					FROM releases r
722
					WHERE 1=1 %s
723
					GROUP BY r.leftguid
724
					LIMIT 16',
725
                    $this->nfoQueryString
726
                )
727
            );
728
            $this->maxProcesses = (int) Settings::settingValue('nfothreads');
729
        }
730
731
        $this->postProcess($this->work, $this->maxProcesses);
732
    }
733
734
    /**
735
     * @throws \Exception
736
     */
737
    private function checkProcessMovies(): bool
738
    {
739
        if (Settings::settingValue('lookupimdb') > 0) {
740
            return DB::select(sprintf('
741
						SELECT id
742
						FROM releases
743
						WHERE categories_id BETWEEN 2000 AND 2999
744
						AND nzbstatus = %d
745
						AND imdbid IS NULL
746
						%s %s
747
						LIMIT 1', NZB::NZB_ADDED, ((int) Settings::settingValue('lookupimdb') === 2 ? 'AND isrenamed = 1' : ''), ($this->ppRenamedOnly ? 'AND isrenamed = 1' : ''))) > 0;
748
        }
749
750
        return false;
751
    }
752
753
    /**
754
     * @throws \Exception
755
     */
756
    private function postProcessMov(): void
757
    {
758
        $this->maxProcesses = 1;
759
        if ($this->checkProcessMovies()) {
760
            $this->processMovies = true;
761
            $this->work = DB::select(
762
                sprintf(
763
                    '
764
					SELECT leftguid AS id, %d AS renamed
765
					FROM releases
766
					WHERE categories_id BETWEEN 2000 AND 2999
767
					AND nzbstatus = %d
768
					AND imdbid IS NULL
769
					%s %s
770
					GROUP BY leftguid
771
					LIMIT 16',
772
                    ($this->ppRenamedOnly ? 2 : 1),
773
                    NZB::NZB_ADDED,
774
                    ((int) Settings::settingValue('lookupimdb') === 2 ? 'AND isrenamed = 1' : ''),
775
                    ($this->ppRenamedOnly ? 'AND isrenamed = 1' : '')
776
                )
777
            );
778
            $this->maxProcesses = (int) Settings::settingValue('postthreadsnon');
779
        }
780
781
        $this->postProcess($this->work, $this->maxProcesses);
782
    }
783
784
    /**
785
     * Check if we should process TV's.
786
     *
787
     *
788
     * @throws \Exception
789
     */
790
    private function checkProcessTV(): bool
791
    {
792
        if ((int) Settings::settingValue('lookuptv') > 0) {
793
            return DB::select(sprintf('
794
						SELECT id
795
						FROM releases
796
						WHERE categories_id BETWEEN 5000 AND 5999
797
						AND nzbstatus = %d
798
						AND size > 1048576
799
						AND tv_episodes_id BETWEEN -2 AND 0
800
						%s %s
801
						', NZB::NZB_ADDED, (int) Settings::settingValue('lookuptv') === 2 ? 'AND isrenamed = 1' : '', $this->ppRenamedOnly ? 'AND isrenamed = 1' : '')) > 0;
802
        }
803
804
        return false;
805
    }
806
807
    /**
808
     * @throws \Exception
809
     */
810
    private function postProcessTv(): void
811
    {
812
        $this->maxProcesses = 1;
813
        if ($this->checkProcessTV()) {
814
            $this->processTV = true;
815
            $this->work = DB::select(
816
                sprintf(
817
                    '
818
					SELECT leftguid AS id, %d AS renamed
819
					FROM releases
820
					WHERE categories_id BETWEEN 5000 AND 5999
821
					AND nzbstatus = %d
822
					AND tv_episodes_id BETWEEN -2 AND 0
823
					AND size > 1048576
824
					%s %s
825
					GROUP BY leftguid
826
					LIMIT 16',
827
                    ($this->ppRenamedOnly ? 2 : 1),
828
                    NZB::NZB_ADDED,
829
                    (int) Settings::settingValue('lookuptv') === 2 ? 'AND isrenamed = 1' : '',
830
                    ($this->ppRenamedOnly ? 'AND isrenamed = 1' : '')
831
                )
832
            );
833
            $this->maxProcesses = (int) Settings::settingValue('postthreadsnon');
834
        }
835
836
        $this->postProcess($this->work, $this->maxProcesses);
837
    }
838
839
    /**
840
     * Process all that require a single thread.
841
     *
842
     * @throws \Exception
843
     */
844
    private function processSingle(): void
845
    {
846
        $postProcess = new PostProcess;
847
        $postProcess->processBooks();
848
        $postProcess->processConsoles();
849
        $postProcess->processGames();
850
        $postProcess->processMusic();
851
        $postProcess->processXXX();
852
    }
853
854
    ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
855
    ///////////////////////////////// All "update_per_Group" code goes here ////////////////////////////////////////////
856
    ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
857
858
    /**
859
     * @throws \Exception
860
     */
861
    private function updatePerGroup(): void
862
    {
863
        $this->work = DB::select('SELECT id , name FROM usenet_groups WHERE (active = 1 OR backfill = 1)');
864
865
        $maxProcess = (int) Settings::settingValue('releasethreads');
866
867
        $pool = Pool::create()->concurrency($maxProcess)->timeout(config('nntmux.multiprocessing_max_child_time'));
868
        $this->processWork();
869
        foreach ($this->work as $group) {
870
            $pool->add(function () use ($group) {
871
                return $this->_executeCommand($this->dnr_path.'update_per_group  '.$group->id.'"');
872
            }, 2000000)->then(function ($output) use ($group) {
873
                echo $output;
874
                $name = UsenetGroup::getNameByID($group->id);
875
                $this->colorCli->primary('Finished updating binaries, processing releases and additional postprocessing for group:'.$name);
876
            })->catch(function (\Throwable $exception) {
877
                echo $exception->getMessage();
878
            })->catch(static function (SerializableException $serializableException) {
879
                echo $serializableException->asThrowable()->getMessage();
880
            });
881
        }
882
883
        $pool->wait();
884
    }
885
886
    ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
887
    //////////////////////////////////////////// Various methods ///////////////////////////////////////////////////////
888
    ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
889
890
    protected function _executeCommand(string $command): string
891
    {
892
        $process = Process::fromShellCommandline($command);
893
        $process->setTimeout(1800);
894
        $process->run(function ($type, $buffer) {
895
            if ($type === Process::ERR) {
896
                echo $buffer;
897
            }
898
        });
899
900
        return $process->getOutput();
901
    }
902
903
    /**
904
     * Echo a message to CLI.
905
     */
906
    public function logger(string $message): void
907
    {
908
        if (config('nntmux.echocli')) {
909
            echo $message.PHP_EOL;
910
        }
911
    }
912
913
    /**
914
     * This method is executed whenever a child is finished doing work.
915
     *
916
     * @param  string  $pid  The PID numbers.
917
     */
918
    public function exit(string $pid): void
919
    {
920
        if (config('nntmux.echocli')) {
921
            $this->colorCli->header(
922
                'Process ID #'.$pid.' has completed.'.PHP_EOL.
923
                'There are '.($this->maxProcesses - 1).' process(es) still active with '.
924
                (--$this->_workCount).' job(s) left in the queue.',
925
                true
926
            );
927
        }
928
    }
929
}
930