| Total Complexity | 44 |
| Total Lines | 406 |
| Duplicated Lines | 0 % |
| Changes | 1 | ||
| Bugs | 0 | Features | 0 |
Complex classes like NntmuxOffsetPopulate often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
While breaking up the class, it is a good idea to analyze how other classes use NntmuxOffsetPopulate, and based on these observations, apply Extract Interface, too.
| 1 | <?php |
||
| 13 | class NntmuxOffsetPopulate extends Command |
||
| 14 | { |
||
| 15 | /** |
||
| 16 | * The name and signature of the console command. |
||
| 17 | * |
||
| 18 | * @var string |
||
| 19 | */ |
||
| 20 | protected $signature = 'nntmux:offset-populate |
||
| 21 | {--manticore : Use ManticoreSearch} |
||
| 22 | {--elastic : Use ElasticSearch} |
||
| 23 | {--releases : Populates the releases index} |
||
| 24 | {--predb : Populates the predb index} |
||
| 25 | {--parallel=8 : Number of parallel processes} |
||
| 26 | {--batch-size=10000 : Batch size for bulk operations} |
||
| 27 | {--disable-keys : Disable database keys during population} |
||
| 28 | {--memory-limit=4G : Memory limit for each process}'; |
||
| 29 | |||
| 30 | /** |
||
| 31 | * The console command description. |
||
| 32 | * |
||
| 33 | * @var string |
||
| 34 | */ |
||
| 35 | protected $description = 'Populate search indexes using offset-based parallel processing for maximum performance'; |
||
| 36 | |||
| 37 | private const SUPPORTED_ENGINES = ['manticore', 'elastic']; |
||
| 38 | |||
| 39 | private const SUPPORTED_INDEXES = ['releases', 'predb']; |
||
| 40 | |||
| 41 | /** |
||
| 42 | * Execute the console command. |
||
| 43 | */ |
||
| 44 | public function handle(): int |
||
| 66 | } |
||
| 67 | |||
| 68 | /** |
||
| 69 | * Populate index using offset-based parallel processing |
||
| 70 | */ |
||
| 71 | private function populateIndexParallel(string $engine, string $index): int |
||
| 72 | { |
||
| 73 | $total = $this->getTotalRecords($index); |
||
| 74 | if (! $total) { |
||
| 75 | $this->warn("{$index} table is empty. Nothing to do."); |
||
| 76 | |||
| 77 | return Command::SUCCESS; |
||
| 78 | } |
||
| 79 | |||
| 80 | $parallelProcesses = $this->option('parallel'); |
||
| 81 | $recordsPerProcess = ceil($total / $parallelProcesses); |
||
| 82 | |||
| 83 | $this->info(sprintf( |
||
| 84 | 'Processing %s records with %d parallel processes, ~%d records per process', |
||
| 85 | number_format($total), |
||
| 86 | $parallelProcesses, |
||
| 87 | $recordsPerProcess |
||
| 88 | )); |
||
| 89 | |||
| 90 | // Clear the index first |
||
| 91 | $this->clearIndex($engine, $index); |
||
| 92 | |||
| 93 | // Create offset-based ranges for parallel processing |
||
| 94 | $ranges = $this->createOffsetRanges($total, $parallelProcesses); |
||
| 95 | $processes = []; |
||
| 96 | |||
| 97 | // Start parallel processes |
||
| 98 | foreach ($ranges as $i => $range) { |
||
| 99 | $command = $this->buildWorkerCommand($engine, $index, $range['offset'], $range['limit'], $i); |
||
| 100 | $this->info("Starting worker process {$i}: processing {$range['limit']} records from offset {$range['offset']}"); |
||
| 101 | |||
| 102 | $process = Process::start($command); |
||
| 103 | $processes[] = [ |
||
| 104 | 'process' => $process, |
||
| 105 | 'id' => $i, |
||
| 106 | 'range' => $range, |
||
| 107 | ]; |
||
| 108 | } |
||
| 109 | |||
| 110 | // Monitor processes |
||
| 111 | $this->monitorProcesses($processes); |
||
| 112 | |||
| 113 | // Verify final count |
||
| 114 | $this->verifyIndexPopulation($engine, $index, $total); |
||
| 115 | |||
| 116 | $this->info('All parallel processes completed successfully!'); |
||
| 117 | |||
| 118 | return Command::SUCCESS; |
||
| 119 | } |
||
| 120 | |||
| 121 | /** |
||
| 122 | * Create offset-based ranges for parallel execution |
||
| 123 | */ |
||
| 124 | private function createOffsetRanges(int $total, int $processes): array |
||
| 125 | { |
||
| 126 | $ranges = []; |
||
| 127 | $recordsPerProcess = ceil($total / $processes); |
||
| 128 | |||
| 129 | for ($i = 0; $i < $processes; $i++) { |
||
| 130 | $offset = $i * $recordsPerProcess; |
||
| 131 | $limit = min($recordsPerProcess, $total - $offset); |
||
| 132 | |||
| 133 | if ($limit > 0) { |
||
| 134 | $ranges[] = [ |
||
| 135 | 'offset' => $offset, |
||
| 136 | 'limit' => $limit, |
||
| 137 | 'worker_id' => $i, |
||
| 138 | ]; |
||
| 139 | } |
||
| 140 | } |
||
| 141 | |||
| 142 | return $ranges; |
||
| 143 | } |
||
| 144 | |||
| 145 | /** |
||
| 146 | * Build worker command for offset-based parallel processing |
||
| 147 | */ |
||
| 148 | private function buildWorkerCommand(string $engine, string $index, int $offset, int $limit, int $workerId): string |
||
| 149 | { |
||
| 150 | $memoryLimit = $this->option('memory-limit') ?? '4G'; |
||
| 151 | $batchSize = $this->option('batch-size'); |
||
| 152 | |||
| 153 | $artisanPath = base_path('artisan'); |
||
| 154 | |||
| 155 | $options = [ |
||
| 156 | "--{$engine}", |
||
| 157 | "--{$index}", |
||
| 158 | "--offset={$offset}", |
||
| 159 | "--limit={$limit}", |
||
| 160 | "--worker-id={$workerId}", |
||
| 161 | "--batch-size={$batchSize}", |
||
| 162 | "--memory-limit={$memoryLimit}", |
||
| 163 | ]; |
||
| 164 | |||
| 165 | if ($this->option('disable-keys')) { |
||
| 166 | $options[] = '--disable-keys'; |
||
| 167 | } |
||
| 168 | |||
| 169 | return sprintf( |
||
| 170 | 'php -d memory_limit=%s "%s" nntmux:offset-worker %s 2>&1', |
||
| 171 | $memoryLimit, |
||
| 172 | $artisanPath, |
||
| 173 | implode(' ', $options) |
||
| 174 | ); |
||
| 175 | } |
||
| 176 | |||
| 177 | /** |
||
| 178 | * Monitor parallel processes |
||
| 179 | */ |
||
| 180 | private function monitorProcesses(array $processes): void |
||
| 181 | { |
||
| 182 | $bar = $this->output->createProgressBar(count($processes)); |
||
| 183 | $bar->setFormat('verbose'); |
||
| 184 | $bar->start(); |
||
| 185 | |||
| 186 | $completed = 0; |
||
| 187 | $completedProcesses = []; |
||
| 188 | $failedProcesses = []; |
||
| 189 | |||
| 190 | while ($completed < count($processes)) { |
||
| 191 | foreach ($processes as $index => $processInfo) { |
||
| 192 | $process = $processInfo['process']; |
||
| 193 | $processId = $processInfo['id']; |
||
| 194 | |||
| 195 | if (in_array($processId, $completedProcesses)) { |
||
| 196 | continue; |
||
| 197 | } |
||
| 198 | |||
| 199 | if (! $process->running()) { |
||
| 200 | $completedProcesses[] = $processId; |
||
| 201 | $completed++; |
||
| 202 | $bar->advance(); |
||
| 203 | |||
| 204 | try { |
||
| 205 | $result = $process->wait(); |
||
| 206 | $exitCode = $result->exitCode(); |
||
| 207 | |||
| 208 | if ($exitCode === 0) { |
||
| 209 | $this->info("Worker {$processId} completed successfully"); |
||
| 210 | } else { |
||
| 211 | $failedProcesses[] = $processId; |
||
| 212 | $this->error("Worker {$processId} failed with exit code: {$exitCode}"); |
||
| 213 | |||
| 214 | $output = $result->output(); |
||
| 215 | $errorOutput = $result->errorOutput(); |
||
| 216 | |||
| 217 | if ($output) { |
||
| 218 | $this->error("Worker {$processId} output: ".substr($output, -500)); |
||
| 219 | } |
||
| 220 | if ($errorOutput) { |
||
| 221 | $this->error("Worker {$processId} error: ".substr($errorOutput, -500)); |
||
| 222 | } |
||
| 223 | } |
||
| 224 | } catch (Exception $e) { |
||
| 225 | $failedProcesses[] = $processId; |
||
| 226 | $this->error("Worker {$processId} exception: {$e->getMessage()}"); |
||
| 227 | } |
||
| 228 | } |
||
| 229 | } |
||
| 230 | |||
| 231 | usleep(500000); // 0.5 second delay |
||
| 232 | } |
||
| 233 | |||
| 234 | $bar->finish(); |
||
| 235 | $this->newLine(); |
||
| 236 | |||
| 237 | if (! empty($failedProcesses)) { |
||
| 238 | $this->error('Failed workers: '.implode(', ', $failedProcesses)); |
||
| 239 | } |
||
| 240 | } |
||
| 241 | |||
| 242 | /** |
||
| 243 | * Verify index population |
||
| 244 | */ |
||
| 245 | private function verifyIndexPopulation(string $engine, string $index, int $expectedTotal): void |
||
| 246 | { |
||
| 247 | $this->info('Verifying index population...'); |
||
| 248 | |||
| 249 | if ($engine === 'elastic') { |
||
| 250 | try { |
||
| 251 | // Wait a moment for ElasticSearch to refresh |
||
| 252 | sleep(2); |
||
| 253 | |||
| 254 | $stats = \Elasticsearch::indices()->stats(['index' => $index]); |
||
| 255 | $actualCount = $stats['indices'][$index]['total']['docs']['count'] ?? 0; |
||
| 256 | |||
| 257 | $this->info('Expected: '.number_format($expectedTotal).' records'); |
||
| 258 | $this->info('Actual: '.number_format($actualCount).' records'); |
||
| 259 | |||
| 260 | if ($actualCount >= $expectedTotal * 0.95) { // Allow for 5% tolerance |
||
| 261 | $this->info('✓ Index population successful!'); |
||
| 262 | } else { |
||
| 263 | $this->warn('⚠ Index population may be incomplete'); |
||
| 264 | } |
||
| 265 | } catch (Exception $e) { |
||
| 266 | $this->warn("Could not verify index population: {$e->getMessage()}"); |
||
| 267 | } |
||
| 268 | } else { |
||
| 269 | $this->info('ManticoreSearch index verification not implemented yet'); |
||
| 270 | } |
||
| 271 | } |
||
| 272 | |||
| 273 | /** |
||
| 274 | * Clear the search index |
||
| 275 | */ |
||
| 276 | private function clearIndex(string $engine, string $index): void |
||
| 277 | { |
||
| 278 | if ($engine === 'manticore') { |
||
| 279 | $manticore = new ManticoreSearch; |
||
| 280 | $indexName = $index === 'releases' ? 'releases_rt' : 'predb_rt'; |
||
| 281 | $manticore->truncateRTIndex(Arr::wrap($indexName)); |
||
| 282 | $this->info("Truncated ManticoreSearch index: {$indexName}"); |
||
| 283 | } else { |
||
| 284 | // For ElasticSearch, just clear the data instead of recreating the index |
||
| 285 | try { |
||
| 286 | $exists = \Elasticsearch::indices()->exists(['index' => $index]); |
||
| 287 | |||
| 288 | if ($exists) { |
||
| 289 | // Get current document count |
||
| 290 | $stats = \Elasticsearch::indices()->stats(['index' => $index]); |
||
| 291 | $currentCount = $stats['indices'][$index]['total']['docs']['count'] ?? 0; |
||
| 292 | |||
| 293 | if ($currentCount > 0) { |
||
| 294 | $this->info("ElasticSearch index '{$index}' exists with {$currentCount} documents. Clearing data..."); |
||
| 295 | |||
| 296 | // Delete all documents but keep the index structure |
||
| 297 | \Elasticsearch::deleteByQuery([ |
||
| 298 | 'index' => $index, |
||
| 299 | 'body' => [ |
||
| 300 | 'query' => ['match_all' => (object) []], |
||
| 301 | ], |
||
| 302 | ]); |
||
| 303 | |||
| 304 | // Force refresh to ensure deletions are visible |
||
| 305 | \Elasticsearch::indices()->refresh(['index' => $index]); |
||
| 306 | $this->info("Cleared all documents from ElasticSearch index: {$index}"); |
||
| 307 | } else { |
||
| 308 | $this->info("ElasticSearch index '{$index}' exists but is already empty"); |
||
| 309 | } |
||
| 310 | } else { |
||
| 311 | $this->info("ElasticSearch index '{$index}' does not exist. Creating optimized index..."); |
||
| 312 | $this->createOptimizedElasticIndex($index); |
||
| 313 | } |
||
| 314 | } catch (Exception $e) { |
||
| 315 | $this->warn("Could not clear ElasticSearch index: {$e->getMessage()}"); |
||
| 316 | $this->info('Attempting to recreate index...'); |
||
| 317 | |||
| 318 | // Fallback: delete and recreate |
||
| 319 | try { |
||
| 320 | \Elasticsearch::indices()->delete(['index' => $index]); |
||
| 321 | } catch (Exception $e) { |
||
| 322 | // Index might not exist, that's okay |
||
| 323 | } |
||
| 324 | $this->createOptimizedElasticIndex($index); |
||
| 325 | } |
||
| 326 | } |
||
| 327 | } |
||
| 328 | |||
| 329 | /** |
||
| 330 | * Create optimized ElasticSearch index |
||
| 331 | */ |
||
| 332 | private function createOptimizedElasticIndex(string $indexName): void |
||
| 353 | } |
||
| 354 | |||
| 355 | /** |
||
| 356 | * Get index mappings |
||
| 357 | */ |
||
| 358 | private function getIndexMappings(string $indexName): array |
||
| 380 | ], |
||
| 381 | ]; |
||
| 382 | } |
||
| 383 | } |
||
| 384 | |||
| 385 | /** |
||
| 386 | * Get total records |
||
| 387 | */ |
||
| 388 | private function getTotalRecords(string $index): int |
||
| 389 | { |
||
| 390 | return $index === 'releases' ? Release::count() : Predb::count(); |
||
|
|
|||
| 391 | } |
||
| 392 | |||
| 393 | /** |
||
| 394 | * Get selected engine |
||
| 395 | */ |
||
| 396 | private function getSelectedEngine(): ?string |
||
| 397 | { |
||
| 398 | foreach (self::SUPPORTED_ENGINES as $engine) { |
||
| 399 | if ($this->option($engine)) { |
||
| 400 | return $engine; |
||
| 401 | } |
||
| 402 | } |
||
| 403 | |||
| 404 | return null; |
||
| 405 | } |
||
| 406 | |||
| 407 | /** |
||
| 408 | * Get selected index |
||
| 409 | */ |
||
| 410 | private function getSelectedIndex(): ?string |
||
| 419 | } |
||
| 420 | } |
||
| 421 |