Passed
Push — bufferfix ( 243a27...28d690 )
by Simon
08:16
created

SolrIndexTask::indexClass()   A

Complexity

Conditions 5
Paths 10

Size

Total Lines 27
Code Lines 19

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 5.9256

Importance

Changes 14
Bugs 1 Features 0
Metric Value
cc 5
eloc 19
c 14
b 1
f 0
nc 10
nop 4
dl 0
loc 27
ccs 6
cts 9
cp 0.6667
crap 5.9256
rs 9.3222
1
<?php
2
3
4
namespace Firesphere\SolrSearch\Tasks;
5
6
use Exception;
7
use Firesphere\SolrSearch\Factories\DocumentFactory;
8
use Firesphere\SolrSearch\Helpers\SolrLogger;
9
use Firesphere\SolrSearch\Indexes\BaseIndex;
10
use Firesphere\SolrSearch\Services\SolrCoreService;
11
use Firesphere\SolrSearch\States\SiteState;
12
use Firesphere\SolrSearch\Traits\LoggerTrait;
13
use GuzzleHttp\Exception\GuzzleException;
14
use Psr\Log\LoggerInterface;
15
use ReflectionException;
16
use SilverStripe\Control\Director;
17
use SilverStripe\Control\HTTPRequest;
18
use SilverStripe\Core\Injector\Injector;
19
use SilverStripe\Dev\BuildTask;
20
use SilverStripe\ORM\ArrayList;
21
use SilverStripe\ORM\DataList;
22
use SilverStripe\ORM\DataObject;
23
use SilverStripe\ORM\DB;
24
use SilverStripe\ORM\ValidationException;
25
use SilverStripe\Versioned\Versioned;
26
27
/**
28
 * Class SolrIndexTask
29
 *
30
 * @description Index items to Solr through a tasks
31
 * @package Firesphere\SolrSearch\Tasks
32
 */
33
class SolrIndexTask extends BuildTask
34
{
35
    use LoggerTrait;
36
    /**
37
     * URLSegment of this task
38
     *
39
     * @var string
40
     */
41
    private static $segment = 'SolrIndexTask';
42
    /**
43
     * Store the current states for all instances of SiteState
44
     *
45
     * @var array
46
     */
47
    public $currentStates;
48
    /**
49
     * My name
50
     *
51
     * @var string
52
     */
53
    protected $title = 'Solr Index update';
54
    /**
55
     * What do I do?
56
     *
57
     * @var string
58
     */
59
    protected $description = 'Add or update documents to an existing Solr core.';
60
    /**
61
     * Debug mode enabled, default false
62
     *
63
     * @var bool
64
     */
65
    protected $debug = false;
66
    /**
67
     * Singleton of {@link SolrCoreService}
68
     *
69
     * @var SolrCoreService
70
     */
71
    protected $service;
72
73
    /**
74
     * SolrIndexTask constructor. Sets up the document factory
75
     *
76
     * @throws ReflectionException
77 14
     */
78
    public function __construct()
79 14
    {
80
        parent::__construct();
81
        // Only index live items.
82 14
        // The old FTS module also indexed Draft items. This is unnecessary
83 14
        Versioned::set_reading_mode(Versioned::DEFAULT_MODE);
84 14
        // If versioned is needed, a separate Versioned Search module is required
85 14
        $this->setService(Injector::inst()->get(SolrCoreService::class));
86 14
        $this->setLogger(Injector::inst()->get(LoggerInterface::class));
87 14
        $this->setDebug(Director::isDev() || Director::is_cli());
88 14
        $currentStates = SiteState::currentStates();
89
        SiteState::setDefaultStates($currentStates);
90
    }
91
92
    /**
93
     * Set the {@link SolrCoreService}
94
     *
95
     * @param SolrCoreService $service
96 14
     * @return SolrIndexTask
97
     */
98 14
    public function setService(SolrCoreService $service): SolrIndexTask
99
    {
100 14
        $this->service = $service;
101
102
        return $this;
103
    }
104
105
    /**
106
     * Set the debug mode
107
     *
108
     * @param bool $debug
109 14
     * @return SolrIndexTask
110
     */
111 14
    public function setDebug(bool $debug): SolrIndexTask
112
    {
113 14
        $this->debug = $debug;
114
115
        return $this;
116
    }
117
118
    /**
119
     * Implement this method in the task subclass to
120
     * execute via the TaskRunner
121
     *
122
     * @param HTTPRequest $request
123
     * @return int|bool
124
     * @throws Exception
125
     * @throws GuzzleException
126 13
     * @todo defer to background because it may run out of memory
127
     */
128 13
    public function run($request)
129 13
    {
130 13
        $startTime = time();
131 13
        list($vars, $group, $isGroup) = $this->taskSetup($request);
132
        $groups = 0;
133 13
        $indexes = $this->service->getValidIndexes($request->getVar('index'));
134
135 13
        foreach ($indexes as $indexName) {
136
            /** @var BaseIndex $index */
137 13
            $index = Injector::inst()->get($indexName, false);
138 13
139 13
            $indexClasses = $index->getClasses();
140 10
            $classes = $this->getClasses($vars, $indexClasses);
141
            if (!count($classes)) {
142
                continue;
143 13
            }
144
145 13
            $this->clearIndex($vars, $index);
146
147 13
            $groups = $this->indexClassForIndex($classes, $isGroup, $index, $group);
148 13
        }
149
150
        $this->getLogger()->info(sprintf('Finished in %s seconds', time() - $startTime));
151 13
152 13
        return $groups;
153
    }
154 13
155
    /**
156
     * Set up the requirements for this task
157
     *
158
     * @param HTTPRequest $request
159
     * @return array
160
     */
161
    protected function taskSetup($request): array
162
    {
163 13
        $vars = $request->getVars();
164
        $this->debug = $this->debug || isset($vars['debug']);
165 13
        $group = $vars['group'] ?? 0;
166 13
        $start = $vars['start'] ?? 0;
167 13
        $group = ($start > $group) ? $start : $group;
168 13
        $isGroup = isset($vars['group']);
169 13
170
        return [$vars, $group, $isGroup];
171 13
    }
172
173
    /**
174
     * get the classes to run for this task execution
175
     *
176
     * @param $vars
177
     * @param array $classes
178
     * @return bool|array
179
     */
180
    protected function getClasses($vars, array $classes): array
181 13
    {
182
        if (isset($vars['class'])) {
183 13
            return array_intersect($classes, [$vars['class']]);
184 1
        }
185
186
        return $classes;
187 12
    }
188
189
    /**
190
     * Clear the given index if a full re-index is needed
191
     *
192
     * @param $vars
193
     * @param BaseIndex $index
194
     * @throws Exception
195
     */
196
    public function clearIndex($vars, BaseIndex $index)
197 13
    {
198
        if (!empty($vars['clear'])) {
199 13
            $this->getLogger()->info(sprintf('Clearing index %s', $index->getIndexName()));
200 1
            $this->service->doManipulate(ArrayList::create([]), SolrCoreService::DELETE_TYPE_ALL, $index);
201 1
        }
202
    }
203 13
204
    /**
205
     * Index the classes for a specific index
206
     *
207
     * @param $classes
208
     * @param $isGroup
209
     * @param BaseIndex $index
210
     * @param $group
211
     * @return int
212
     * @throws Exception
213
     * @throws GuzzleException
214
     */
215 13
    protected function indexClassForIndex($classes, $isGroup, BaseIndex $index, $group): int
216
    {
217 13
        $groups = 0;
218 13
        foreach ($classes as $class) {
219 13
            $groups = $this->indexClass($isGroup, $class, $index, $group);
220
        }
221
222 13
        return $groups;
223
    }
224
225
    /**
226
     * Index a single class for a given index. {@link static::indexClassForIndex()}
227
     *
228
     * @param bool $isGroup
229
     * @param string $class
230
     * @param BaseIndex $index
231
     * @param int $group
232
     * @return int
233
     * @throws GuzzleException
234
     * @throws ValidationException
235 13
     */
236
    private function indexClass($isGroup, $class, BaseIndex $index, int $group): int
237 13
    {
238
        $this->getLogger()->info(sprintf('Indexing %s for %s', $class, $index->getIndexName()), []);
239
240 13
        $batchLength = DocumentFactory::config()->get('batchLength');
241
        $groups = (int)ceil($class::get()->count() / $batchLength);
242
        $groups = $isGroup ? $group : $groups;
243
        // How much cores do we have
244
        $pids = [];
0 ignored issues
show
Unused Code introduced by
The assignment to $pids is dead and can be removed.
Loading history...
245 13
        $this->getLogger()->info(sprintf('Total groups %s', $groups));
246
        do { // Run from oldest to newest
247
            try {
248
                // Temporary workaround for CircleCI
249
                if (function_exists('pcntl_fork')) {
250
                    list($group, $status) = $this->runChildProcess($class, $index, $group, $groups, $batchLength);
251
                } else {
252
                    $this->doReindex($group, $class, $batchLength, $index);
253
                }
254
            } catch (Exception $error) {
255
                $this->logException($index->getIndexName(), $group, $error);
256
                $group++;
257 13
                continue;
258
            }
259 13
            $group++;
260 13
        } while ($group <= $groups);
261
262
        return $groups;
263 13
    }
264
265
    /**
266 13
     * Reindex the given group, for each state
267 13
     *
268
     * @param int $group
269
     * @param string $class
270
     * @param int $batchLength
271
     * @param BaseIndex $index
272
     * @throws Exception
273
     */
274
    private function doReindex($group, $class, $batchLength, BaseIndex $index): int
275
    {
276
        if (!DB::get_conn()) {
277 13
            $config = DB::getConfig();
278
            DB::connect($config);
279
        }
280 13
        foreach (SiteState::getStates() as $state) {
281
            if ($state !== 'default' && !empty($state)) {
282 13
                SiteState::withState($state);
283 13
            }
284 13
            $this->stateReindex($group, $class, $batchLength, $index);
285 13
        }
286 13
287
        SiteState::withState(SiteState::DEFAULT_STATE);
288 13
        $this->getLogger()->info(sprintf('Indexed group %s', $group ));
289 1
        return 0;
290
    }
291 13
292
    /**
293
     * Index a group of a class for a specific state and index
294
     *
295
     * @param $group
296
     * @param $class
297
     * @param $batchLength
298
     * @param BaseIndex $index
299
     * @throws Exception
300 1
     */
301
    private function stateReindex($group, $class, $batchLength, BaseIndex $index): void
302 1
    {
303 1
        // Generate filtered list of local records
304 1
        $baseClass = DataObject::getSchema()->baseDataClass($class);
305
        /** @var DataList|DataObject[] $items */
306
        $items = DataObject::get($baseClass)
307
            ->sort('ID ASC')
308
            ->limit($batchLength, ($group * $batchLength));
309
        if ($items->count()) {
310
            $this->updateIndex($index, $items);
311
        }
312
    }
313
314
    /**
315
     * Execute the update on the client
316
     *
317
     * @param BaseIndex $index
318
     * @param $items
319
     * @throws Exception
320
     */
321
    private function updateIndex(BaseIndex $index, $items): void
322
    {
323
        $client = $index->getClient();
324
        $update = $client->createUpdate();
325
        $this->service->setInDebugMode($this->debug);
326
        $this->service->updateIndex($index, $items, $update);
327
        $update->addCommit();
328
        $client->update($update);
329
    }
330
331
    /**
332
     * Log an exception if it happens. Most are catched, these logs are for the developers
333
     * to identify problems and fix them.
334
     *
335
     * @param string $index
336
     * @param int $group
337
     * @param Exception $exception
338
     * @throws GuzzleException
339
     * @throws ValidationException
340
     */
341
    private function logException($index, int $group, Exception $exception): void
342
    {
343
        $this->getLogger()->error($exception->getMessage());
344
        $msg = sprintf(
345
            'Error indexing core %s on group %s,' . PHP_EOL .
346
            'Please log in to the CMS to find out more about Indexing errors' . PHP_EOL,
347
            $index,
348
            $group
349
        );
350
        SolrLogger::logMessage('ERROR', $msg, $index);
351
    }
352
353
    /**
354
     * @param $class
355
     * @param BaseIndex $index
356
     * @param int $group
357
     * @param $cores
358
     * @param array $pids
359
     * @param int $groups
360
     * @param $batchLength
361
     * @return array
362
     * @throws Exception
363
     */
364
    private function runChildProcess($class, BaseIndex $index, int $group, int $groups, $batchLength): array
365
    {
366
        $cores = SolrCoreService::config()->get('cores');
367
        $pids = [];
368
        // for each core, start a grouped indexing
369
        for ($i = 0; $i < $cores; $i++) {
370
            $group += $i;
371
            $pid = pcntl_fork();
372
            $pids[$i] = $pid;
373
            if (!$pid && $group < $groups) {
374
                $this->doReindex($group, $class, $batchLength, $index);
375
            }
376
        }
377
        // Wait for each child to finish
378
        foreach ($pids as $pid) {
379
            if ($pid) {
380
                pcntl_waitpid($pid, $status);
381
            }
382
        }
383
384
        return [$group, $status];
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable $status does not seem to be defined for all execution paths leading up to this point.
Loading history...
385
    }
386
}
387