Passed
Push — dev ( 256bd7...cf5b1e )
by Darko
08:12
created

Forking::processStartWork()   A

Complexity

Conditions 3
Paths 3

Size

Total Lines 7
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 12

Importance

Changes 0
Metric Value
eloc 5
c 0
b 0
f 0
dl 0
loc 7
ccs 0
cts 6
cp 0
rs 10
cc 3
nc 3
nop 0
crap 12
1
<?php
2
3
namespace Blacklight\libraries;
4
5
use App\Models\Settings;
6
use App\Models\UsenetGroup;
7
use Blacklight\ColorCLI;
8
use Blacklight\Nfo;
9
use Blacklight\NNTP;
10
use Blacklight\NZB;
11
use Blacklight\processing\PostProcess;
12
use Illuminate\Support\Carbon;
13
use Illuminate\Support\Facades\DB;
14
use Illuminate\Support\Facades\Log;
15
use Opis\Closure\SerializableClosure;
16
use Spatie\Async\Pool;
17
use Symfony\Component\Process\Process;
18
19
/**
20
 * Class Forking.
21
 *
22
 * This forks various newznab scripts.
23
 *
24
 * For example, you get all the ID's of the active groups in the groups table, you then iterate over them and spawn
25
 * processes of misc/update_binaries.php passing the group ID's.
26
 */
27
class Forking
28
{
29
    /**
30
     * @var \Blacklight\ColorCLI
31
     */
32
    public $colorCli;
33
34
    /**
35
     * @var int The type of output
36
     */
37
    protected $outputType;
38
39
    /**
40
     * Path to do not run folder.
41
     *
42
     * @var string
43
     */
44
    private $dnr_path;
45
46
    /**
47
     * Work to work on.
48
     *
49
     * @var array
50
     */
51
    private $work = [];
52
53
    /**
54
     * How much work do we have to do?
55
     *
56
     * @var int
57
     */
58
    public $_workCount = 0;
59
60
    /**
61
     * The type of work we want to work on.
62
     *
63
     * @var string
64
     */
65
    private $workType = '';
66
67
    /**
68
     * List of passed in options for the current work type.
69
     *
70
     * @var array
71
     */
72
    private $workTypeOptions = [];
73
74
    /**
75
     * Max amount of child processes to do work at a time.
76
     *
77
     * @var int
78
     */
79
    private $maxProcesses = 1;
80
81
    /**
82
     * Group used for safe backfill.
83
     *
84
     * @var string
85
     */
86
    private $safeBackfillGroup = '';
87
    /**
88
     * @var int
89
     */
90
    protected $maxSize;
91
92
    /**
93
     * @var int
94
     */
95
    protected $minSize;
96
97
    /**
98
     * @var int
99
     */
100
    protected $maxRetries;
101
102
    /**
103
     * @var int
104
     */
105
    protected $dummy;
106
107
    /**
108
     * @var bool
109
     */
110
    private $processAdditional = false; // Should we process additional?
111
    private $processNFO = false; // Should we process NFOs?
112
    private $processMovies = false; // Should we process Movies?
113
    private $processTV = false; // Should we process TV?
114
115
    /**
116
     * Setup required parent / self vars.
117
     *
118
     * @throws \Exception
119
     */
120
    public function __construct()
121
    {
122
        SerializableClosure::removeSecurityProvider();
123
        $this->colorCli = new ColorCLI();
124
125
        $this->dnr_path = PHP_BINARY.' misc/update/multiprocessing/.do_not_run/switch.php "php  ';
126
127
        $this->maxSize = (int) Settings::settingValue('..maxsizetoprocessnfo');
128
        $this->minSize = (int) Settings::settingValue('..minsizetoprocessnfo');
129
        $this->maxRetries = (int) Settings::settingValue('..maxnforetries') >= 0 ? -((int) Settings::settingValue('..maxnforetries') + 1) : Nfo::NFO_UNPROC;
130
        $this->maxRetries = $this->maxRetries < -8 ? -8 : $this->maxRetries;
131
    }
132
133
    /**
134
     * Setup the class to work on a type of work, then process the work.
135
     * Valid work types:.
136
     *
137
     * @param string $type The type of multiProcessing to do : backfill, binaries, releases, postprocess
138
     * @param array $options Array containing arguments for the type of work.
139
     *
140
     * @throws \Exception
141
     */
142
    public function processWorkType($type, array $options = [])
143
    {
144
        // Set/reset some variables.
145
        $startTime = now()->timestamp;
146
        $this->workType = $type;
147
        $this->workTypeOptions = $options;
148
        $this->processAdditional = $this->processNFO = $this->processTV = $this->processMovies = $this->ppRenamedOnly = false;
149
        $this->work = [];
150
151
        // Process extra work that should not be forked and done before forking.
152
        $this->processStartWork();
153
154
        // Get work to fork.
155
        $this->getWork();
156
157
        // Process extra work that should not be forked and done after.
158
        $this->processEndWork();
159
160
        if (config('nntmux.echocli')) {
161
            $this->colorCli->header(
162
                'Multi-processing for '.$this->workType.' finished in '.(now()->timestamp - $startTime).
163
                ' seconds at '.now()->toRfc2822String().'.'.PHP_EOL
164
            );
165
        }
166
    }
167
168
    /**
169
     * Only post process renamed movie / tv releases?
170
     *
171
     * @var bool
172
     */
173
    private $ppRenamedOnly;
174
175
    /**
176
     * Get work for our workers to work on, set the max child processes here.
177
     *
178
     * @throws \Exception
179
     */
180
    private function getWork()
181
    {
182
        $this->maxProcesses = 0;
183
184
        switch ($this->workType) {
185
186
            case 'backfill':
187
                $this->backfill();
188
                break;
189
190
            case 'binaries':
191
                $this->binaries();
192
                break;
193
194
            case 'fixRelNames_standard':
195
            case 'fixRelNames_predbft':
196
                $this->fixRelNames();
197
                break;
198
199
            case 'releases':
200
                $this->releases();
201
                break;
202
203
            case 'postProcess_ama':
204
                $this->processSingle();
205
                break;
206
207
            case 'postProcess_add':
208
                $this->postProcessAdd();
209
                break;
210
211
            case 'postProcess_mov':
212
                $this->ppRenamedOnly = (isset($this->workTypeOptions[0]) && $this->workTypeOptions[0] === true);
213
                $this->postProcessMov();
214
                break;
215
216
            case 'postProcess_nfo':
217
                $this->postProcessNfo();
218
                break;
219
220
            case 'postProcess_sha':
221
                $this->processSharing();
222
                break;
223
224
            case 'postProcess_tv':
225
                $this->ppRenamedOnly = (isset($this->workTypeOptions[0]) && $this->workTypeOptions[0] === true);
226
                $this->postProcessTv();
227
                break;
228
229
            case 'safe_backfill':
230
                $this->safeBackfill();
231
                break;
232
233
            case 'safe_binaries':
234
                $this->safeBinaries();
235
                break;
236
237
            case 'update_per_group':
238
                $this->updatePerGroup();
239
                break;
240
        }
241
    }
242
243
    /**
244
     * Process work if we have any.
245
     */
246
    private function processWork()
247
    {
248
        $this->_workCount = \count($this->work);
249
        if ($this->_workCount > 0 && config('nntmux.echocli') === true) {
250
            $this->colorCli->header(
251
                'Multi-processing started at '.now()->toRfc2822String().' for '.$this->workType.' with '.$this->_workCount.
252
                ' job(s) to do using a max of '.$this->maxProcesses.' child process(es).'
253
            );
254
        }
255
        if (empty($this->_workCount) && config('nntmux.echocli') === true) {
256
            $this->colorCli->header('No work to do!');
257
        }
258
    }
259
260
    /**
261
     * Process any work that does not need to be forked, but needs to run at the start.
262
     */
263
    private function processStartWork()
264
    {
265
        switch ($this->workType) {
266
            case 'safe_backfill':
267
            case 'safe_binaries':
268
                $this->_executeCommand(PHP_BINARY.' misc/update/tmux/bin/update_groups.php');
269
                break;
270
        }
271
    }
272
273
    /**
274
     * Process any work that does not need to be forked, but needs to run at the end.
275
     */
276
    private function processEndWork()
277
    {
278
        switch ($this->workType) {
279
            case 'update_per_group':
280
            case 'releases':
281
282
                $this->_executeCommand($this->dnr_path.'releases  '.\count($this->work).'_"');
283
284
                break;
285
        }
286
    }
287
288
    ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
289
    //////////////////////////////////////// All backFill code here ////////////////////////////////////////////////////
290
    ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
291
292
    private function backfill()
293
    {
294
        // The option for backFill is for doing up to x articles. Else it's done by date.
295
        $this->work = DB::select(
296
            sprintf(
297
                'SELECT name %s FROM usenet_groups WHERE backfill = 1',
298
                ($this->workTypeOptions[0] === false ? '' : (', '.$this->workTypeOptions[0].' AS max'))
299
            )
300
        );
301
302
        $pool = Pool::create()->concurrency($this->maxProcesses)->timeout(config('nntmux.multiprocessing_max_child_time'));
303
        $this->processWork();
304
        $maxWork = \count($this->work);
305
        foreach ($this->work as $group) {
306
            $pool->add(function () use ($group) {
307
                $this->_executeCommand(PHP_BINARY.' misc/update/backfill.php '.$group->name.(isset($group->max) ? (' '.$group->max) : ''));
308
            }, 100000)->then(function () use ($group, $maxWork) {
309
                $this->colorCli->primary('Task #'.$maxWork.' Backfilled group '.$group->name);
310
            })->catch(function (\Throwable $exception) {
311
                echo $exception->getMessage();
312
            });
313
            $maxWork--;
314
        }
315
        $pool->wait();
316
    }
317
318
    private function safeBackfill()
319
    {
320
        $backfill_qty = (int) Settings::settingValue('site.tmux.backfill_qty');
321
        $backfill_order = (int) Settings::settingValue('site.tmux.backfill_order');
322
        $backfill_days = (int) Settings::settingValue('site.tmux.backfill_days');
323
        $maxmssgs = (int) Settings::settingValue('..maxmssgs');
324
        $threads = (int) Settings::settingValue('..backfillthreads');
325
326
        $orderby = 'ORDER BY a.last_record ASC';
327
        switch ($backfill_order) {
328
            case 1:
329
                $orderby = 'ORDER BY first_record_postdate DESC';
330
                break;
331
332
            case 2:
333
                $orderby = 'ORDER BY first_record_postdate ASC';
334
                break;
335
336
            case 3:
337
                $orderby = 'ORDER BY name ASC';
338
                break;
339
340
            case 4:
341
                $orderby = 'ORDER BY name DESC';
342
                break;
343
344
            case 5:
345
                $orderby = 'ORDER BY a.last_record DESC';
346
                break;
347
        }
348
349
        $backfilldays = '';
350
        if ($backfill_days === 1) {
351
            $backfilldays = 'g.backfill_target';
352
        } elseif ($backfill_days === 2) {
353
            $backfilldays = now()->diffInDays(Carbon::createFromFormat('Y-m-d', Settings::settingValue('..safebackfilldate')));
354
        }
355
356
        $data = DB::select(
357
            sprintf(
358
                'SELECT g.name,
359
				g.first_record AS our_first,
360
				MAX(a.first_record) AS their_first,
361
				MAX(a.last_record) AS their_last
362
				FROM usenet_groups g
363
				INNER JOIN short_groups a ON g.name = a.name
364
				WHERE g.first_record IS NOT NULL
365
				AND g.first_record_postdate IS NOT NULL
366
				AND g.backfill = 1
367
				AND (NOW() - INTERVAL %d DAY ) < g.first_record_postdate
368
				GROUP BY a.name, a.last_record, g.name, g.first_record
369
				%s LIMIT 1',
370
                $backfilldays,
371
                $orderby
372
            )
373
        );
374
375
        $count = 0;
376
        if ($data[0]->name) {
377
            $this->safeBackfillGroup = $data[0]->name;
378
379
            $count = ($data[0]->our_first - $data[0]->their_first);
380
        }
381
382
        if ($count > 0) {
383
            if ($count > ($backfill_qty * $threads)) {
384
                $geteach = ceil(($backfill_qty * $threads) / $maxmssgs);
385
            } else {
386
                $geteach = $count / $maxmssgs;
387
            }
388
389
            $queues = [];
390
            for ($i = 0; $i <= $geteach - 1; $i++) {
391
                $queues[$i] = sprintf('get_range  backfill  %s  %s  %s  %s', $data[0]->name, $data[0]->our_first - $i * $maxmssgs - $maxmssgs, $data[0]->our_first - $i * $maxmssgs - 1, $i + 1);
392
            }
393
394
            $pool = Pool::create()->concurrency($threads)->timeout(config('nntmux.multiprocessing_max_child_time'));
395
396
            $this->processWork();
397
            foreach ($queues as $queue) {
398
                $pool->add(function () use ($queue) {
399
                    $this->_executeCommand($this->dnr_path.$queue.'"');
400
                }, 100000)->then(function () use ($data) {
401
                    $this->colorCli->primary('Backfilled group '.$data[0]->name);
402
                })->catch(function (\Throwable $exception) {
403
                    echo $exception->getMessage();
404
                });
405
            }
406
            $pool->wait();
407
        }
408
    }
409
410
    ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
411
    //////////////////////////////////////// All binaries code here ////////////////////////////////////////////////////
412
    ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
413
414
    private function binaries()
415
    {
416
        $this->work = DB::select(
417
            sprintf(
418
                'SELECT name, %d AS max FROM usenet_groups WHERE active = 1',
419
                $this->workTypeOptions[0]
420
            )
421
        );
422
423
        $this->maxProcesses = (int) Settings::settingValue('..binarythreads');
424
425
        $pool = Pool::create()->concurrency($this->maxProcesses)->timeout(config('nntmux.multiprocessing_max_child_time'));
426
427
        $maxWork = \count($this->work);
428
429
        $this->processWork();
430
        foreach ($this->work as $group) {
431
            $pool->add(function () use ($group) {
432
                $this->_executeCommand(PHP_BINARY.' misc/update/update_binaries.php '.$group->name.' '.$group->max);
433
            }, 100000)->then(function () use ($group, $maxWork) {
434
                $this->colorCli->primary('Task #'.$maxWork.' Updated group '.$group->name);
435
            })->catch(function (\Throwable $exception) {
436
                echo $exception->getMessage();
437
            });
438
            $maxWork--;
439
        }
440
441
        $pool->wait();
442
    }
443
444
    /**
445
     * @throws \Exception
446
     */
447
    private function safeBinaries()
448
    {
449
        $maxheaders = (int) Settings::settingValue('..max_headers_iteration') ?: 1000000;
450
        $maxmssgs = (int) Settings::settingValue('..maxmssgs');
451
        $this->maxProcesses = (int) Settings::settingValue('..binarythreads');
452
453
        $this->work = DB::select(
454
            '
455
			SELECT g.name AS groupname, g.last_record AS our_last,
456
				a.last_record AS their_last
457
			FROM usenet_groups g
458
			INNER JOIN short_groups a ON g.active = 1 AND g.name = a.name
459
			ORDER BY a.last_record DESC'
460
        );
461
462
        if (! empty($this->work)) {
463
            $i = 1;
464
            $queues = [];
465
            foreach ($this->work as $group) {
466
                if ((int) $group->our_last === 0) {
467
                    $queues[$i] = sprintf('update_group_headers  %s', $group->groupname);
468
                    $i++;
469
                } else {
470
                    //only process if more than 20k headers available and skip the first 20k
471
                    $count = $group->their_last - $group->our_last - 20000;
472
                    //echo "count: " . $count . "maxmsgs x2: " . ($maxmssgs * 2) . PHP_EOL;
473
                    if ($count <= $maxmssgs * 2) {
474
                        $queues[$i] = sprintf('update_group_headers  %s', $group->groupname);
475
                        $i++;
476
                    } else {
477
                        $queues[$i] = sprintf('part_repair  %s', $group->groupname);
478
                        $i++;
479
                        $geteach = floor(min($count, $maxheaders) / $maxmssgs);
480
                        $remaining = min($count, $maxheaders) - $geteach * $maxmssgs;
481
                        //echo "maxmssgs: " . $maxmssgs . " geteach: " . $geteach . " remaining: " . $remaining . PHP_EOL;
482
                        for ($j = 0; $j < $geteach; $j++) {
483
                            $queues[$i] = sprintf('get_range  binaries  %s  %s  %s  %s', $group->groupname, $group->our_last + $j * $maxmssgs + 1, $group->our_last + $j * $maxmssgs + $maxmssgs, $i);
484
                            $i++;
485
                        }
486
                        //add remainder to queue
487
                        $queues[$i] = sprintf('get_range  binaries  %s  %s  %s  %s', $group->groupname, $group->our_last + ($j + 1) * $maxmssgs + 1, $group->our_last + ($j + 1) * $maxmssgs + $remaining + 1, $i);
488
                        $i++;
489
                    }
490
                }
491
            }
492
            $pool = Pool::create()->concurrency($this->maxProcesses)->timeout(config('nntmux.multiprocessing_max_child_time'));
493
494
            $this->processWork();
495
            foreach ($queues as $queue) {
496
                preg_match('/alt\..+/i', $queue, $match);
497
                $pool->add(function () use ($queue) {
498
                    $this->_executeCommand($this->dnr_path.$queue.'"');
499
                }, 100000)->then(function () use ($match) {
500
                    if (! empty($match)) {
501
                        $this->colorCli->primary('Updated group '.$match[0]);
502
                    }
503
                })->catch(function (\Throwable $exception) {
504
                    echo $exception->getMessage();
505
                });
506
            }
507
508
            $pool->wait();
509
        }
510
    }
511
512
    ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
513
    //////////////////////////////////// All fix release names code here ///////////////////////////////////////////////
514
    ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
515
516
    private function fixRelNames()
517
    {
518
        $this->maxProcesses = (int) Settings::settingValue('..fixnamethreads');
519
        $maxperrun = (int) Settings::settingValue('..fixnamesperrun');
520
521
        if ($this->maxProcesses > 16) {
522
            $this->maxProcesses = 16;
523
        } elseif ($this->maxProcesses === 0) {
524
            $this->maxProcesses = 1;
525
        }
526
527
        $leftGuids = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'];
528
529
        // Prevent PreDB FT from always running
530
        if ($this->workTypeOptions[0] === 'predbft') {
531
            $preCount = DB::select(
532
                sprintf(
533
                    "
534
					SELECT COUNT(p.id) AS num
535
					FROM predb p
536
					WHERE LENGTH(p.title) >= 15
537
					AND p.title NOT REGEXP '[\"\<\> ]'
538
					AND p.searched = 0
539
					AND p.predate < (NOW() - INTERVAL 1 DAY)"
540
                )
541
            );
542
            if ($preCount[0]->num > 0) {
543
                $leftGuids = \array_slice($leftGuids, 0, (int) ceil($preCount[0]->num / $maxperrun));
544
            } else {
545
                $leftGuids = [];
546
            }
547
        }
548
549
        $count = 0;
550
        $queues = [];
551
        foreach ($leftGuids as $leftGuid) {
552
            $count++;
553
            if ($maxperrun > 0) {
554
                $queues[$count] = sprintf('%s %s %s %s', $this->workTypeOptions[0], $leftGuid, $maxperrun, $count);
555
            }
556
        }
557
558
        $this->work = $queues;
559
560
        $pool = Pool::create()->concurrency($this->maxProcesses)->timeout(config('nntmux.multiprocessing_max_child_time'));
561
562
        $maxWork = \count($queues);
563
564
        $this->processWork();
565
        foreach ($this->work as $queue) {
566
            $pool->add(function () use ($queue) {
567
                $this->_executeCommand(PHP_BINARY.' misc/update/tmux/bin/groupfixrelnames.php "'.$queue.'"'.' true');
568
            }, 100000)->then(function () use ($maxWork) {
569
                $this->colorCli->primary('Task #'.$maxWork.' Finished fixing releases names');
570
            })->catch(function (\Throwable $exception) {
571
                echo $exception->getMessage();
572
            });
573
            $maxWork--;
574
        }
575
        $pool->wait();
576
    }
577
578
    ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
579
    //////////////////////////////////////// All releases code here ////////////////////////////////////////////////////
580
    ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
581
582
    private function releases()
583
    {
584
        $work = DB::select('SELECT id, name FROM usenet_groups WHERE (active = 1 OR backfill = 1)');
585
        $this->maxProcesses = (int) Settings::settingValue('..releasethreads');
586
587
        $uGroups = [];
588
        foreach ($work as $group) {
589
            try {
590
                $query = DB::select(sprintf('SELECT id FROM collections WHERE groups_id = %d LIMIT 1', $group->id));
591
                if (! empty($query)) {
592
                    $uGroups[] = ['id' => $group->id, 'name' => $group->name];
593
                }
594
            } catch (\PDOException $e) {
595
                if (config('app.debug') === true) {
596
                    Log::debug($e->getMessage());
597
                }
598
            }
599
        }
600
601
        $maxWork = \count($uGroups);
602
603
        $this->work = $uGroups;
604
605
        $pool = Pool::create()->concurrency($this->maxProcesses)->timeout(config('nntmux.multiprocessing_max_child_time'));
606
607
        $this->processWork();
608
        foreach ($uGroups as $group) {
609
            $pool->add(function () use ($group) {
610
                $this->_executeCommand($this->dnr_path.'releases  '.$group['id'].'"');
611
            }, 100000)->then(function () use ($maxWork) {
612
                $this->colorCli->primary('Task #'.$maxWork.' Finished performing release processing');
613
            })->catch(function (\Throwable $exception) {
614
                echo $exception->getMessage();
615
            });
616
            $maxWork--;
617
        }
618
619
        $pool->wait();
620
    }
621
622
    ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
623
    /////////////////////////////////////// All post process code here /////////////////////////////////////////////////
624
    ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
625
626
    /**
627
     * Only 1 exit method is used for post process, since they are all similar.
628
     *
629
     *
630
     * @param array $releases
631
     * @param int   $maxProcess
632
     */
633
    public function postProcess($releases, $maxProcess)
634
    {
635
        $type = $desc = '';
636
        if ($this->processAdditional) {
637
            $type = 'additional true ';
638
            $desc = 'additional postprocessing';
639
        } elseif ($this->processNFO) {
640
            $type = 'nfo true ';
641
            $desc = 'nfo postprocessing';
642
        } elseif ($this->processMovies) {
643
            $type = 'movies true ';
644
            $desc = 'movies postprocessing';
645
        } elseif ($this->processTV) {
646
            $type = 'tv true ';
647
            $desc = 'tv postprocessing';
648
        }
649
        $pool = Pool::create()->concurrency($maxProcess)->timeout(config('nntmux.multiprocessing_max_child_time'));
650
        $count = \count($releases);
651
        $this->processWork();
652
        foreach ($releases as $release) {
653
            if ($type !== '') {
654
                $pool->add(function () use ($release, $type) {
655
                    $this->_executeCommand(PHP_BINARY.' misc/update/postprocess.php '.$type.$release->id);
656
                }, 100000)->then(function () use ($desc, $count) {
657
                    $this->colorCli->primary('Finished task #'.$count.' for '.$desc);
658
                })->catch(function (\Throwable $exception) {
659
                    echo $exception->getMessage();
660
                })->timeout(function () use ($count) {
661
                    $this->colorCli->notice('Task #'.$count.': Timeout occurred.');
662
                });
663
                $count--;
664
            }
665
        }
666
        $pool->wait();
667
    }
668
669
    /**
670
     * @throws \Exception
671
     */
672
    private function postProcessAdd()
673
    {
674
        $ppAddMinSize = Settings::settingValue('..minsizetopostprocess') !== '' ? (int) Settings::settingValue('..minsizetopostprocess') : 1;
0 ignored issues
show
introduced by
The condition App\Models\Settings::set...etopostprocess') !== '' is always true.
Loading history...
675
        $ppAddMinSize = ($ppAddMinSize > 0 ? ('AND r.size > '.($ppAddMinSize * 1048576)) : '');
676
        $ppAddMaxSize = (Settings::settingValue('..maxsizetopostprocess') !== '') ? (int) Settings::settingValue('..maxsizetopostprocess') : 100;
0 ignored issues
show
introduced by
The condition App\Models\Settings::set...etopostprocess') !== '' is always true.
Loading history...
677
        $ppAddMaxSize = ($ppAddMaxSize > 0 ? ('AND r.size < '.($ppAddMaxSize * 1073741824)) : '');
678
        $this->maxProcesses = 1;
679
        $ppQueue = DB::select(
680
            sprintf(
681
                '
682
					SELECT r.leftguid AS id
683
					FROM releases r
684
					LEFT JOIN categories c ON c.id = r.categories_id
685
					WHERE r.nzbstatus = %d
686
					AND r.passwordstatus = -1
687
					AND r.haspreview = -1
688
					AND c.disablepreview = 0
689
					%s %s
690
					GROUP BY r.leftguid
691
					LIMIT 16',
692
                NZB::NZB_ADDED,
693
                $ppAddMaxSize,
694
                $ppAddMinSize
695
            )
696
        );
697
        if (\count($ppQueue) > 0) {
698
            $this->processAdditional = true;
699
            $this->work = $ppQueue;
700
            $this->maxProcesses = (int) Settings::settingValue('..postthreads');
701
        }
702
703
        $this->postProcess($this->work, $this->maxProcesses);
704
    }
705
706
    private $nfoQueryString = '';
707
708
    /**
709
     * Check if we should process NFO's.
710
     *
711
     * @return bool
712
     * @throws \Exception
713
     */
714
    private function checkProcessNfo(): bool
715
    {
716
        if ((int) Settings::settingValue('..lookupnfo') === 1) {
717
            $this->nfoQueryString = Nfo::NfoQueryString();
718
719
            return DB::select(sprintf('SELECT r.id FROM releases r WHERE 1=1 %s LIMIT 1', $this->nfoQueryString)) > 0;
720
        }
721
722
        return false;
723
    }
724
725
    /**
726
     * @throws \Exception
727
     */
728
    private function postProcessNfo()
729
    {
730
        $this->maxProcesses = 1;
731
        if ($this->checkProcessNfo()) {
732
            $this->processNFO = true;
733
            $this->work = DB::select(
734
                sprintf(
735
                    '
736
					SELECT r.leftguid AS id
737
					FROM releases r
738
					WHERE 1=1 %s
739
					GROUP BY r.leftguid
740
					LIMIT 16',
741
                    $this->nfoQueryString
742
                )
743
            );
744
            $this->maxProcesses = (int) Settings::settingValue('..nfothreads');
745
        }
746
747
        $this->postProcess($this->work, $this->maxProcesses);
748
    }
749
750
    /**
751
     * @return bool
752
     * @throws \Exception
753
     */
754
    private function checkProcessMovies(): bool
755
    {
756
        if (Settings::settingValue('..lookupimdb') > 0) {
757
            return DB::select(sprintf('
758
						SELECT id
759
						FROM releases
760
						WHERE categories_id BETWEEN 2000 AND 2999
761
						AND nzbstatus = %d
762
						AND imdbid IS NULL
763
						%s %s
764
						LIMIT 1', NZB::NZB_ADDED, ((int) Settings::settingValue('..lookupimdb') === 2 ? 'AND isrenamed = 1' : ''), ($this->ppRenamedOnly ? 'AND isrenamed = 1' : ''))) > 0;
765
        }
766
767
        return false;
768
    }
769
770
    /**
771
     * @throws \Exception
772
     */
773
    private function postProcessMov()
774
    {
775
        $this->maxProcesses = 1;
776
        if ($this->checkProcessMovies()) {
777
            $this->processMovies = true;
778
            $this->work = DB::select(
779
                sprintf(
780
                    '
781
					SELECT leftguid AS id, %d AS renamed
782
					FROM releases
783
					WHERE categories_id BETWEEN 2000 AND 2999
784
					AND nzbstatus = %d
785
					AND imdbid IS NULL
786
					%s %s
787
					GROUP BY leftguid
788
					LIMIT 16',
789
                    ($this->ppRenamedOnly ? 2 : 1),
790
                    NZB::NZB_ADDED,
791
                    ((int) Settings::settingValue('..lookupimdb') === 2 ? 'AND isrenamed = 1' : ''),
792
                    ($this->ppRenamedOnly ? 'AND isrenamed = 1' : '')
793
                )
794
            );
795
            $this->maxProcesses = (int) Settings::settingValue('..postthreadsnon');
796
        }
797
798
        $this->postProcess($this->work, $this->maxProcesses);
799
    }
800
801
    /**
802
     * Check if we should process TV's.
803
     * @return bool
804
     * @throws \Exception
805
     */
806
    private function checkProcessTV()
807
    {
808
        if ((int) Settings::settingValue('..lookuptvrage') > 0) {
809
            return DB::select(sprintf('
810
						SELECT id
811
						FROM releases
812
						WHERE categories_id BETWEEN 5000 AND 5999
813
						AND nzbstatus = %d
814
						AND size > 1048576
815
						AND tv_episodes_id BETWEEN -2 AND 0
816
						%s %s
817
						', NZB::NZB_ADDED, (int) Settings::settingValue('..lookuptvrage') === 2 ? 'AND isrenamed = 1' : '', $this->ppRenamedOnly ? 'AND isrenamed = 1' : '')) > 0;
818
        }
819
820
        return false;
821
    }
822
823
    /**
824
     * @throws \Exception
825
     */
826
    private function postProcessTv()
827
    {
828
        $this->maxProcesses = 1;
829
        if ($this->checkProcessTV()) {
830
            $this->processTV = true;
831
            $this->work = DB::select(
832
                sprintf(
833
                    '
834
					SELECT leftguid AS id, %d AS renamed
835
					FROM releases
836
					WHERE categories_id BETWEEN 5000 AND 5999
837
					AND nzbstatus = %d
838
					AND tv_episodes_id BETWEEN -2 AND 0
839
					AND size > 1048576
840
					%s %s
841
					GROUP BY leftguid
842
					LIMIT 16',
843
                    ($this->ppRenamedOnly ? 2 : 1),
844
                    NZB::NZB_ADDED,
845
                    (int) Settings::settingValue('..lookuptvrage') === 2 ? 'AND isrenamed = 1' : '',
846
                    ($this->ppRenamedOnly ? 'AND isrenamed = 1' : '')
847
                )
848
            );
849
            $this->maxProcesses = (int) Settings::settingValue('..postthreadsnon');
850
        }
851
852
        $this->postProcess($this->work, $this->maxProcesses);
853
    }
854
855
    /**
856
     * Process sharing.
857
     * @return bool
858
     * @throws \Exception
859
     */
860
    private function processSharing()
861
    {
862
        $sharing = DB::select('SELECT enabled FROM sharing');
863
        if ($sharing > 0 && (int) $sharing[0]->enabled === 1) {
864
            $nntp = new NNTP();
865
            if ((int) (Settings::settingValue('..alternate_nntp') === 1 ? $nntp->doConnect(true, true) : $nntp->doConnect()) === true) {
0 ignored issues
show
introduced by
The condition App\Models\Settings::set....alternate_nntp') === 1 is always false.
Loading history...
introduced by
The condition (int)App\Models\Settings...p->doConnect() === true is always false.
Loading history...
866
                (new PostProcess(['ColorCLI' => $this->colorCli]))->processSharing($nntp);
867
            }
868
869
            return true;
870
        }
871
872
        return false;
873
    }
874
875
    /**
876
     * Process all that require a single thread.
877
     *
878
     * @throws \Exception
879
     */
880
    private function processSingle()
881
    {
882
        $postProcess = new PostProcess(['ColorCLI' => $this->colorCli]);
883
        //$postProcess->processAnime();
884
        $postProcess->processBooks();
885
        $postProcess->processConsoles();
886
        $postProcess->processGames();
887
        $postProcess->processMusic();
888
        $postProcess->processXXX();
889
    }
890
891
    ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
892
    ///////////////////////////////// All "update_per_Group" code goes here ////////////////////////////////////////////
893
    ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
894
895
    /**
896
     * @throws \Exception
897
     */
898
    private function updatePerGroup()
899
    {
900
        $this->work = DB::select('SELECT id , name FROM usenet_groups WHERE (active = 1 OR backfill = 1)');
901
902
        $maxProcess = (int) Settings::settingValue('..releasethreads');
903
904
        $pool = Pool::create()->concurrency($maxProcess)->timeout(config('nntmux.multiprocessing_max_child_time'));
905
        $this->processWork();
906
        foreach ($this->work as $group) {
907
            $pool->add(function () use ($group) {
908
                $this->_executeCommand($this->dnr_path.'update_per_group  '.$group->id.'"');
909
            }, 100000)->then(function () use ($group) {
910
                $name = UsenetGroup::getNameByID($group->id);
911
                $this->colorCli->primary('Finished updating binaries, processing releases and additional postprocessing for group:'.$name);
912
            })->catch(function (\Throwable $exception) {
913
                echo $exception->getMessage();
914
            });
915
        }
916
917
        $pool->wait();
918
    }
919
920
    ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
921
    //////////////////////////////////////////// Various methods ///////////////////////////////////////////////////////
922
    ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
923
924
    /**
925
     * Execute a shell command.
926
     *
927
     * @param string $command
928
     * @return string
929
     */
930
    protected function _executeCommand($command)
931
    {
932
        $process = new Process($command);
0 ignored issues
show
Bug introduced by
$command of type string is incompatible with the type array expected by parameter $command of Symfony\Component\Process\Process::__construct(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

932
        $process = new Process(/** @scrutinizer ignore-type */ $command);
Loading history...
933
        $process->setTimeout(360);
934
        $process->run(function ($type, $buffer) {
935
            if (Process::ERR === $type) {
936
                echo $buffer;
937
            }
938
        });
939
940
        return $process->getOutput();
941
    }
942
943
    /**
944
     * Echo a message to CLI.
945
     *
946
     * @param string $message
947
     */
948
    public function logger($message)
949
    {
950
        if (config('nntmux.echocli')) {
951
            echo $message.PHP_EOL;
952
        }
953
    }
954
955
    /**
956
     * This method is executed whenever a child is finished doing work.
957
     *
958
     * @param string $pid        The PID numbers.
959
     */
960
    public function exit($pid)
961
    {
962
        if (config('nntmux.echocli')) {
963
            $this->colorCli->header(
964
                'Process ID #'.$pid.' has completed.'.PHP_EOL.
965
                'There are '.($this->maxProcesses - 1).' process(es) still active with '.
966
                (--$this->_workCount).' job(s) left in the queue.',
967
                true
968
            );
969
        }
970
    }
971
}
972