Issues (376)

app/Console/Commands/NntmuxOffsetPopulate.php (1 issue)

1
<?php
2
3
namespace App\Console\Commands;
4
5
use App\Models\Predb;
6
use App\Models\Release;
7
use Blacklight\ManticoreSearch;
8
use Exception;
9
use Illuminate\Console\Command;
10
use Illuminate\Support\Arr;
11
use Illuminate\Support\Facades\Process;
12
13
class NntmuxOffsetPopulate extends Command
14
{
15
    /**
16
     * The name and signature of the console command.
17
     *
18
     * @var string
19
     */
20
    protected $signature = 'nntmux:offset-populate
21
                                       {--manticore : Use ManticoreSearch}
22
                                       {--elastic : Use ElasticSearch}
23
                                       {--releases : Populates the releases index}
24
                                       {--predb : Populates the predb index}
25
                                       {--parallel=8 : Number of parallel processes}
26
                                       {--batch-size=10000 : Batch size for bulk operations}
27
                                       {--disable-keys : Disable database keys during population}
28
                                       {--memory-limit=4G : Memory limit for each process}';
29
30
    /**
31
     * The console command description.
32
     *
33
     * @var string
34
     */
35
    protected $description = 'Populate search indexes using offset-based parallel processing for maximum performance';
36
37
    private const SUPPORTED_ENGINES = ['manticore', 'elastic'];
38
39
    private const SUPPORTED_INDEXES = ['releases', 'predb'];
40
41
    /**
42
     * Execute the console command.
43
     */
44
    public function handle(): int
45
    {
46
        $engine = $this->getSelectedEngine();
47
        $index = $this->getSelectedIndex();
48
49
        if (! $engine || ! $index) {
50
            $this->error('You must specify both an engine (--manticore or --elastic) and an index (--releases or --predb).');
51
52
            return Command::FAILURE;
53
        }
54
55
        $this->info("Starting offset-based parallel {$engine} {$index} population...");
56
        $startTime = microtime(true);
57
58
        $result = $this->populateIndexParallel($engine, $index);
59
60
        $executionTime = round(microtime(true) - $startTime, 2);
61
        if ($result === Command::SUCCESS) {
62
            $this->info("Offset-based parallel population completed in {$executionTime} seconds.");
63
        }
64
65
        return $result;
66
    }
67
68
    /**
69
     * Populate index using offset-based parallel processing
70
     */
71
    private function populateIndexParallel(string $engine, string $index): int
72
    {
73
        $total = $this->getTotalRecords($index);
74
        if (! $total) {
75
            $this->warn("{$index} table is empty. Nothing to do.");
76
77
            return Command::SUCCESS;
78
        }
79
80
        $parallelProcesses = $this->option('parallel');
81
        $recordsPerProcess = ceil($total / $parallelProcesses);
82
83
        $this->info(sprintf(
84
            'Processing %s records with %d parallel processes, ~%d records per process',
85
            number_format($total),
86
            $parallelProcesses,
87
            $recordsPerProcess
88
        ));
89
90
        // Clear the index first
91
        $this->clearIndex($engine, $index);
92
93
        // Create offset-based ranges for parallel processing
94
        $ranges = $this->createOffsetRanges($total, $parallelProcesses);
95
        $processes = [];
96
97
        // Start parallel processes
98
        foreach ($ranges as $i => $range) {
99
            $command = $this->buildWorkerCommand($engine, $index, $range['offset'], $range['limit'], $i);
100
            $this->info("Starting worker process {$i}: processing {$range['limit']} records from offset {$range['offset']}");
101
102
            $process = Process::start($command);
103
            $processes[] = [
104
                'process' => $process,
105
                'id' => $i,
106
                'range' => $range,
107
            ];
108
        }
109
110
        // Monitor processes
111
        $this->monitorProcesses($processes);
112
113
        // Verify final count
114
        $this->verifyIndexPopulation($engine, $index, $total);
115
116
        $this->info('All parallel processes completed successfully!');
117
118
        return Command::SUCCESS;
119
    }
120
121
    /**
122
     * Create offset-based ranges for parallel execution
123
     */
124
    private function createOffsetRanges(int $total, int $processes): array
125
    {
126
        $ranges = [];
127
        $recordsPerProcess = ceil($total / $processes);
128
129
        for ($i = 0; $i < $processes; $i++) {
130
            $offset = $i * $recordsPerProcess;
131
            $limit = min($recordsPerProcess, $total - $offset);
132
133
            if ($limit > 0) {
134
                $ranges[] = [
135
                    'offset' => $offset,
136
                    'limit' => $limit,
137
                    'worker_id' => $i,
138
                ];
139
            }
140
        }
141
142
        return $ranges;
143
    }
144
145
    /**
146
     * Build worker command for offset-based parallel processing
147
     */
148
    private function buildWorkerCommand(string $engine, string $index, int $offset, int $limit, int $workerId): string
149
    {
150
        $memoryLimit = $this->option('memory-limit') ?? '4G';
151
        $batchSize = $this->option('batch-size');
152
153
        $artisanPath = base_path('artisan');
154
155
        $options = [
156
            "--{$engine}",
157
            "--{$index}",
158
            "--offset={$offset}",
159
            "--limit={$limit}",
160
            "--worker-id={$workerId}",
161
            "--batch-size={$batchSize}",
162
            "--memory-limit={$memoryLimit}",
163
        ];
164
165
        if ($this->option('disable-keys')) {
166
            $options[] = '--disable-keys';
167
        }
168
169
        return sprintf(
170
            'php -d memory_limit=%s "%s" nntmux:offset-worker %s 2>&1',
171
            $memoryLimit,
172
            $artisanPath,
173
            implode(' ', $options)
174
        );
175
    }
176
177
    /**
178
     * Monitor parallel processes
179
     */
180
    private function monitorProcesses(array $processes): void
181
    {
182
        $bar = $this->output->createProgressBar(count($processes));
183
        $bar->setFormat('verbose');
184
        $bar->start();
185
186
        $completed = 0;
187
        $completedProcesses = [];
188
        $failedProcesses = [];
189
190
        while ($completed < count($processes)) {
191
            foreach ($processes as $index => $processInfo) {
192
                $process = $processInfo['process'];
193
                $processId = $processInfo['id'];
194
195
                if (in_array($processId, $completedProcesses)) {
196
                    continue;
197
                }
198
199
                if (! $process->running()) {
200
                    $completedProcesses[] = $processId;
201
                    $completed++;
202
                    $bar->advance();
203
204
                    try {
205
                        $result = $process->wait();
206
                        $exitCode = $result->exitCode();
207
208
                        if ($exitCode === 0) {
209
                            $this->info("Worker {$processId} completed successfully");
210
                        } else {
211
                            $failedProcesses[] = $processId;
212
                            $this->error("Worker {$processId} failed with exit code: {$exitCode}");
213
214
                            $output = $result->output();
215
                            $errorOutput = $result->errorOutput();
216
217
                            if ($output) {
218
                                $this->error("Worker {$processId} output: ".substr($output, -500));
219
                            }
220
                            if ($errorOutput) {
221
                                $this->error("Worker {$processId} error: ".substr($errorOutput, -500));
222
                            }
223
                        }
224
                    } catch (Exception $e) {
225
                        $failedProcesses[] = $processId;
226
                        $this->error("Worker {$processId} exception: {$e->getMessage()}");
227
                    }
228
                }
229
            }
230
231
            usleep(500000); // 0.5 second delay
232
        }
233
234
        $bar->finish();
235
        $this->newLine();
236
237
        if (! empty($failedProcesses)) {
238
            $this->error('Failed workers: '.implode(', ', $failedProcesses));
239
        }
240
    }
241
242
    /**
243
     * Verify index population
244
     */
245
    private function verifyIndexPopulation(string $engine, string $index, int $expectedTotal): void
246
    {
247
        $this->info('Verifying index population...');
248
249
        if ($engine === 'elastic') {
250
            try {
251
                // Wait a moment for ElasticSearch to refresh
252
                sleep(2);
253
254
                $stats = \Elasticsearch::indices()->stats(['index' => $index]);
255
                $actualCount = $stats['indices'][$index]['total']['docs']['count'] ?? 0;
256
257
                $this->info('Expected: '.number_format($expectedTotal).' records');
258
                $this->info('Actual: '.number_format($actualCount).' records');
259
260
                if ($actualCount >= $expectedTotal * 0.95) { // Allow for 5% tolerance
261
                    $this->info('✓ Index population successful!');
262
                } else {
263
                    $this->warn('âš  Index population may be incomplete');
264
                }
265
            } catch (Exception $e) {
266
                $this->warn("Could not verify index population: {$e->getMessage()}");
267
            }
268
        } else {
269
            $this->info('ManticoreSearch index verification not implemented yet');
270
        }
271
    }
272
273
    /**
274
     * Clear the search index
275
     */
276
    private function clearIndex(string $engine, string $index): void
277
    {
278
        if ($engine === 'manticore') {
279
            $manticore = new ManticoreSearch;
280
            $indexName = $index === 'releases' ? 'releases_rt' : 'predb_rt';
281
            $manticore->truncateRTIndex(Arr::wrap($indexName));
282
            $this->info("Truncated ManticoreSearch index: {$indexName}");
283
        } else {
284
            // For ElasticSearch, just clear the data instead of recreating the index
285
            try {
286
                $exists = \Elasticsearch::indices()->exists(['index' => $index]);
287
288
                if ($exists) {
289
                    // Get current document count
290
                    $stats = \Elasticsearch::indices()->stats(['index' => $index]);
291
                    $currentCount = $stats['indices'][$index]['total']['docs']['count'] ?? 0;
292
293
                    if ($currentCount > 0) {
294
                        $this->info("ElasticSearch index '{$index}' exists with {$currentCount} documents. Clearing data...");
295
296
                        // Delete all documents but keep the index structure
297
                        \Elasticsearch::deleteByQuery([
298
                            'index' => $index,
299
                            'body' => [
300
                                'query' => ['match_all' => (object) []],
301
                            ],
302
                        ]);
303
304
                        // Force refresh to ensure deletions are visible
305
                        \Elasticsearch::indices()->refresh(['index' => $index]);
306
                        $this->info("Cleared all documents from ElasticSearch index: {$index}");
307
                    } else {
308
                        $this->info("ElasticSearch index '{$index}' exists but is already empty");
309
                    }
310
                } else {
311
                    $this->info("ElasticSearch index '{$index}' does not exist. Creating optimized index...");
312
                    $this->createOptimizedElasticIndex($index);
313
                }
314
            } catch (Exception $e) {
315
                $this->warn("Could not clear ElasticSearch index: {$e->getMessage()}");
316
                $this->info('Attempting to recreate index...');
317
318
                // Fallback: delete and recreate
319
                try {
320
                    \Elasticsearch::indices()->delete(['index' => $index]);
321
                } catch (Exception $e) {
322
                    // Index might not exist, that's okay
323
                }
324
                $this->createOptimizedElasticIndex($index);
325
            }
326
        }
327
    }
328
329
    /**
330
     * Create optimized ElasticSearch index
331
     */
332
    private function createOptimizedElasticIndex(string $indexName): void
333
    {
334
        $settings = [
335
            'index' => $indexName,
336
            'body' => [
337
                'settings' => [
338
                    'number_of_shards' => 1,
339
                    'number_of_replicas' => 0,
340
                    'refresh_interval' => '30s',
341
                    'translog' => [
342
                        'durability' => 'async',
343
                        'sync_interval' => '30s',
344
                        'flush_threshold_size' => '1gb',
345
                    ],
346
                ],
347
                'mappings' => $this->getIndexMappings($indexName),
348
            ],
349
        ];
350
351
        \Elasticsearch::indices()->create($settings);
352
        $this->info("Created optimized ElasticSearch index: {$indexName}");
353
    }
354
355
    /**
356
     * Get index mappings
357
     */
358
    private function getIndexMappings(string $indexName): array
359
    {
360
        if ($indexName === 'releases') {
361
            return [
362
                'properties' => [
363
                    'id' => ['type' => 'long'],
364
                    'name' => ['type' => 'text', 'analyzer' => 'standard'],
365
                    'searchname' => ['type' => 'text', 'analyzer' => 'standard'],
366
                    'plainsearchname' => ['type' => 'text', 'analyzer' => 'standard'],
367
                    'fromname' => ['type' => 'text', 'analyzer' => 'standard'],
368
                    'categories_id' => ['type' => 'integer'],
369
                    'filename' => ['type' => 'text', 'analyzer' => 'standard'],
370
                    'postdate' => ['type' => 'date'],
371
                ],
372
            ];
373
        } else {
374
            return [
375
                'properties' => [
376
                    'id' => ['type' => 'long'],
377
                    'title' => ['type' => 'text', 'analyzer' => 'standard'],
378
                    'filename' => ['type' => 'text', 'analyzer' => 'standard'],
379
                    'source' => ['type' => 'keyword'],
380
                ],
381
            ];
382
        }
383
    }
384
385
    /**
386
     * Get total records
387
     */
388
    private function getTotalRecords(string $index): int
389
    {
390
        return $index === 'releases' ? Release::count() : Predb::count();
0 ignored issues
show
Bug Best Practice introduced by
The expression return $index === 'relea...p\Models\Predb::count() could return the type Illuminate\Database\Eloq...gHasThroughRelationship which is incompatible with the type-hinted return integer. Consider adding an additional type-check to rule them out.
Loading history...
391
    }
392
393
    /**
394
     * Get selected engine
395
     */
396
    private function getSelectedEngine(): ?string
397
    {
398
        foreach (self::SUPPORTED_ENGINES as $engine) {
399
            if ($this->option($engine)) {
400
                return $engine;
401
            }
402
        }
403
404
        return null;
405
    }
406
407
    /**
408
     * Get selected index
409
     */
410
    private function getSelectedIndex(): ?string
411
    {
412
        foreach (self::SUPPORTED_INDEXES as $index) {
413
            if ($this->option($index)) {
414
                return $index;
415
            }
416
        }
417
418
        return null;
419
    }
420
}
421