NntmuxOffsetWorker   B
last analyzed

Complexity

Total Complexity 49

Size/Duplication

Total Lines 334
Duplicated Lines 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
wmc 49
eloc 168
c 1
b 0
f 0
dl 0
loc 334
rs 8.48

10 Methods

Rating   Name   Duplication   Size   Complexity  
A getSelectedIndex() 0 3 3
B getTransformer() 0 39 8
A handle() 0 32 5
B processOffset() 0 78 9
A buildOffsetQuery() 0 22 2
A processManticoreBatch() 0 15 4
B processElasticBatch() 0 27 9
A getIndexName() 0 6 3
A getSelectedEngine() 0 3 3
A optimizeWorkerSettings() 0 23 3

How to fix   Complexity   

Complex Class

Complex classes like NntmuxOffsetWorker often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use NntmuxOffsetWorker, and based on these observations, apply Extract Interface, too.

1
<?php
2
3
namespace App\Console\Commands;
4
5
use App\Models\Predb;
6
use App\Models\Release;
7
use Blacklight\ManticoreSearch;
8
use Exception;
9
use Illuminate\Console\Command;
10
use Illuminate\Support\Facades\DB;
11
12
class NntmuxOffsetWorker extends Command
13
{
14
    /**
15
     * The name and signature of the console command.
16
     *
17
     * @var string
18
     */
19
    protected $signature = 'nntmux:offset-worker
20
                                       {--manticore : Use ManticoreSearch}
21
                                       {--elastic : Use ElasticSearch}
22
                                       {--releases : Populates the releases index}
23
                                       {--predb : Populates the predb index}
24
                                       {--offset=0 : Start offset}
25
                                       {--limit=0 : Number of records to process}
26
                                       {--worker-id=0 : Worker ID}
27
                                       {--batch-size=10000 : Batch size for bulk operations}
28
                                       {--disable-keys : Disable database keys}
29
                                       {--memory-limit=4G : Memory limit}';
30
31
    /**
32
     * The console command description.
33
     *
34
     * @var string
35
     */
36
    protected $description = 'Offset-based worker process for parallel index population';
37
38
    private const GROUP_CONCAT_MAX_LEN = 65535;
39
40
    /**
41
     * Execute the console command.
42
     */
43
    public function handle(): int
44
    {
45
        $workerId = $this->option('worker-id');
46
        $offset = (int) $this->option('offset');
47
        $limit = (int) $this->option('limit');
48
49
        $this->info("Worker {$workerId} starting: processing {$limit} records from offset {$offset}");
50
51
        try {
52
            $this->optimizeWorkerSettings();
53
54
            $engine = $this->getSelectedEngine();
55
            $index = $this->getSelectedIndex();
56
57
            if (! $engine || ! $index) {
58
                $this->error('Engine and index must be specified');
59
60
                return Command::FAILURE;
61
            }
62
63
            $result = $this->processOffset($engine, $index, $offset, $limit, $workerId);
64
65
            if ($result === Command::SUCCESS) {
66
                $this->info("Worker {$workerId} completed successfully");
67
            }
68
69
            return $result;
70
71
        } catch (Exception $e) {
72
            $this->error("Worker {$workerId} failed: {$e->getMessage()}");
73
74
            return Command::FAILURE;
75
        }
76
    }
77
78
    /**
79
     * Process a specific offset range of records
80
     */
81
    private function processOffset(string $engine, string $index, int $offset, int $limit, int $workerId): int
82
    {
83
        $batchSize = (int) $this->option('batch-size');
84
        $query = $this->buildOffsetQuery($index, $offset, $limit);
85
        $transformer = $this->getTransformer($engine, $index);
86
87
        $indexName = $this->getIndexName($engine, $index);
88
        $processed = 0;
89
        $errors = 0;
90
91
        $this->info("Worker {$workerId}: Processing {$limit} records with batch size {$batchSize}");
92
        $startTime = microtime(true);
93
94
        if ($engine === 'manticore') {
95
            $manticore = new ManticoreSearch;
96
            $batchData = [];
97
98
            $query->chunk($batchSize, function ($items) use ($manticore, $indexName, $transformer, &$processed, &$errors, &$batchData, $batchSize, $workerId) {
99
                $this->info("Worker {$workerId}: Processing chunk of {$items->count()} items");
100
101
                foreach ($items as $item) {
102
                    try {
103
                        $batchData[] = $transformer($item);
104
                        $processed++;
105
106
                        if (count($batchData) >= $batchSize) {
107
                            $this->processManticoreBatch($manticore, $indexName, $batchData, $workerId);
108
                            $this->info("Worker {$workerId}: Inserted batch of ".count($batchData).' records');
109
                            $batchData = [];
110
                        }
111
                    } catch (Exception $e) {
112
                        $errors++;
113
                        $this->error("Worker {$workerId}: Error processing item {$item->id}: {$e->getMessage()}");
114
                    }
115
                }
116
            });
117
118
            // Process remaining items
119
            if (! empty($batchData)) {
120
                $this->processManticoreBatch($manticore, $indexName, $batchData, $workerId);
121
                $this->info("Worker {$workerId}: Inserted final batch of ".count($batchData).' records');
122
            }
123
124
        } else { // ElasticSearch
125
            $query->chunk($batchSize, function ($items) use ($indexName, $transformer, &$processed, &$errors, $workerId) {
126
                $this->info("Worker {$workerId}: Processing chunk of {$items->count()} items");
127
                $data = ['body' => []];
128
129
                foreach ($items as $item) {
130
                    try {
131
                        $transformedData = $transformer($item);
132
133
                        $data['body'][] = [
134
                            'index' => [
135
                                '_index' => $indexName,
136
                                '_id' => $item->id,
137
                            ],
138
                        ];
139
                        $data['body'][] = $transformedData;
140
141
                        $processed++;
142
                    } catch (Exception $e) {
143
                        $errors++;
144
                        $this->error("Worker {$workerId}: Error processing item {$item->id}: {$e->getMessage()}");
145
                    }
146
                }
147
148
                if (! empty($data['body'])) {
149
                    $this->processElasticBatch($data, $workerId);
150
                    $this->info("Worker {$workerId}: Inserted batch of ".(count($data['body']) / 2).' records');
151
                }
152
            });
153
        }
154
155
        $executionTime = round(microtime(true) - $startTime, 2);
156
        $this->info("Worker {$workerId}: Completed processing {$processed} records with {$errors} errors in {$executionTime} seconds");
157
158
        return Command::SUCCESS;
159
    }
160
161
    /**
162
     * Build offset-based query
163
     */
164
    private function buildOffsetQuery(string $index, int $offset, int $limit)
165
    {
166
        if ($index === 'releases') {
167
            return Release::query()
168
                ->select([
169
                    'releases.id',
170
                    'releases.name',
171
                    'releases.searchname',
172
                    'releases.fromname',
173
                    'releases.categories_id',
174
                    'releases.postdate',
175
                ])
176
                ->selectRaw('(SELECT GROUP_CONCAT(rf.name SEPARATOR " ") FROM release_files rf WHERE rf.releases_id = releases.id) AS filename')
177
                ->orderBy('releases.id')
0 ignored issues
show
Bug introduced by
'releases.id' of type string is incompatible with the type Closure|Illuminate\Datab...\Database\Query\Builder expected by parameter $column of Illuminate\Database\Query\Builder::orderBy(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

177
                ->orderBy(/** @scrutinizer ignore-type */ 'releases.id')
Loading history...
178
                ->offset($offset)
179
                ->limit($limit);
180
        } else {
181
            return Predb::query()
182
                ->select(['id', 'title', 'filename', 'source'])
183
                ->orderBy('id')
184
                ->offset($offset)
185
                ->limit($limit);
186
        }
187
    }
188
189
    /**
190
     * Get transformer function
191
     */
192
    private function getTransformer(string $engine, string $index): callable
193
    {
194
        if ($index === 'releases') {
195
            if ($engine === 'manticore') {
196
                return function ($item) {
197
                    return [
198
                        'id' => (string) $item->id,
199
                        'name' => (string) ($item->name ?: ''),
200
                        'searchname' => (string) ($item->searchname ?: ''),
201
                        'fromname' => (string) ($item->fromname ?: ''),
202
                        'categories_id' => (string) ($item->categories_id ?: '0'),
203
                        'filename' => (string) ($item->filename ?: ''),
204
                        'dummy' => '1',
205
                    ];
206
                };
207
            } else {
208
                return function ($item) {
209
                    $searchName = str_replace(['.', '-'], ' ', $item->searchname ?? '');
210
211
                    return [
212
                        'id' => $item->id,
213
                        'name' => $item->name,
214
                        'searchname' => $item->searchname,
215
                        'plainsearchname' => $searchName,
216
                        'fromname' => $item->fromname,
217
                        'categories_id' => $item->categories_id,
218
                        'filename' => $item->filename ?? '',
219
                        'postdate' => $item->postdate,
220
                    ];
221
                };
222
            }
223
        } else { // predb
224
            return function ($item) {
225
                return [
226
                    'id' => $item->id,
227
                    'title' => (string) ($item->title ?? ''),
228
                    'filename' => (string) ($item->filename ?? ''),
229
                    'source' => (string) ($item->source ?? ''),
230
                    'dummy' => 1,
231
                ];
232
            };
233
        }
234
    }
235
236
    /**
237
     * Get index name
238
     */
239
    private function getIndexName(string $engine, string $index): string
240
    {
241
        if ($engine === 'manticore') {
242
            return $index === 'releases' ? 'releases_rt' : 'predb_rt';
243
        } else {
244
            return $index;
245
        }
246
    }
247
248
    /**
249
     * Process ManticoreSearch batch
250
     */
251
    private function processManticoreBatch(ManticoreSearch $manticore, string $indexName, array $data, int $workerId): void
252
    {
253
        $retries = 3;
254
        $attempt = 0;
255
256
        while ($attempt < $retries) {
257
            try {
258
                $manticore->manticoreSearch->table($indexName)->replaceDocuments($data);
259
                break;
260
            } catch (Exception $e) {
261
                $attempt++;
262
                if ($attempt >= $retries) {
263
                    throw new Exception("Worker {$workerId}: Failed to process ManticoreSearch batch after {$retries} attempts: {$e->getMessage()}");
264
                }
265
                usleep(200000); // 200ms delay before retry
266
            }
267
        }
268
    }
269
270
    /**
271
     * Process ElasticSearch batch
272
     */
273
    private function processElasticBatch(array $data, int $workerId): void
274
    {
275
        $retries = 3;
276
        $attempt = 0;
277
278
        while ($attempt < $retries) {
279
            try {
280
                $response = \Elasticsearch::bulk($data);
281
282
                if (isset($response['errors']) && $response['errors']) {
283
                    $errorCount = 0;
284
                    foreach ($response['items'] as $item) {
285
                        if (isset($item['index']['error'])) {
286
                            $errorCount++;
287
                        }
288
                    }
289
                    if ($errorCount > 0) {
290
                        $this->warn("Worker {$workerId}: ElasticSearch batch had {$errorCount} errors");
291
                    }
292
                }
293
                break;
294
            } catch (Exception $e) {
295
                $attempt++;
296
                if ($attempt >= $retries) {
297
                    throw new Exception("Worker {$workerId}: Failed to process ElasticSearch batch after {$retries} attempts: {$e->getMessage()}");
298
                }
299
                usleep(200000); // 200ms delay before retry
300
            }
301
        }
302
    }
303
304
    /**
305
     * Optimize worker settings
306
     */
307
    private function optimizeWorkerSettings(): void
308
    {
309
        // Set memory limit
310
        ini_set('memory_limit', $this->option('memory-limit'));
311
312
        // Database optimizations
313
        if ($this->option('disable-keys')) {
314
            try {
315
                DB::statement('SET SESSION FOREIGN_KEY_CHECKS = 0');
316
                DB::statement('SET SESSION UNIQUE_CHECKS = 0');
317
                DB::statement('SET SESSION AUTOCOMMIT = 0');
318
                DB::statement('SET SESSION read_buffer_size = 2097152');
319
                DB::statement('SET SESSION sort_buffer_size = 16777216');
320
            } catch (Exception $e) {
321
                $this->warn("Could not optimize database settings: {$e->getMessage()}");
322
            }
323
        }
324
325
        // Set GROUP_CONCAT max length
326
        DB::statement('SET SESSION group_concat_max_len = ?', [self::GROUP_CONCAT_MAX_LEN]);
327
328
        // PHP optimizations
329
        gc_enable();
330
    }
331
332
    /**
333
     * Get selected engine
334
     */
335
    private function getSelectedEngine(): ?string
336
    {
337
        return $this->option('manticore') ? 'manticore' : ($this->option('elastic') ? 'elastic' : null);
338
    }
339
340
    /**
341
     * Get selected index
342
     */
343
    private function getSelectedIndex(): ?string
344
    {
345
        return $this->option('releases') ? 'releases' : ($this->option('predb') ? 'predb' : null);
346
    }
347
}
348