1 | <?php |
||
2 | /** |
||
3 | * Class SolrIndexTask|Firesphere\SolrSearch\Tasks\SolrIndexTask Index Solr cores |
||
4 | * |
||
5 | * @package Firesphere\Solr\Search |
||
6 | * @author Simon `Firesphere` Erkelens; Marco `Sheepy` Hermo |
||
7 | * @copyright Copyright (c) 2018 - now() Firesphere & Sheepy |
||
8 | */ |
||
9 | |||
10 | namespace Firesphere\SolrSearch\Tasks; |
||
11 | |||
12 | use Exception; |
||
13 | use Firesphere\SolrSearch\Factories\DocumentFactory; |
||
14 | use Firesphere\SolrSearch\Helpers\SolrLogger; |
||
15 | use Firesphere\SolrSearch\Indexes\BaseIndex; |
||
16 | use Firesphere\SolrSearch\Services\SolrCoreService; |
||
17 | use Firesphere\SolrSearch\States\SiteState; |
||
18 | use Firesphere\SolrSearch\Traits\LoggerTrait; |
||
19 | use Firesphere\SolrSearch\Traits\SolrIndexTrait; |
||
20 | use Psr\Log\LoggerInterface; |
||
21 | use ReflectionException; |
||
22 | use SilverStripe\Control\Controller; |
||
23 | use SilverStripe\Control\Director; |
||
24 | use SilverStripe\Control\HTTPRequest; |
||
25 | use SilverStripe\Core\Injector\Injector; |
||
26 | use SilverStripe\Dev\BuildTask; |
||
27 | use SilverStripe\ORM\ArrayList; |
||
28 | use SilverStripe\ORM\DataList; |
||
29 | use SilverStripe\ORM\DataObject; |
||
30 | use SilverStripe\ORM\DB; |
||
31 | use SilverStripe\ORM\SS_List; |
||
32 | use SilverStripe\ORM\ValidationException; |
||
33 | use SilverStripe\Versioned\Versioned; |
||
34 | use Solarium\Exception\HttpException; |
||
35 | |||
36 | /** |
||
37 | * Class SolrIndexTask |
||
38 | * |
||
39 | * @description Index items to Solr through a tasks |
||
40 | * @package Firesphere\Solr\Search |
||
41 | */ |
||
42 | class SolrIndexTask extends BuildTask |
||
43 | { |
||
44 | use LoggerTrait; |
||
45 | use SolrIndexTrait; |
||
46 | |||
47 | /** |
||
48 | * URLSegment of this task |
||
49 | * |
||
50 | * @var string |
||
51 | */ |
||
52 | private static $segment = 'SolrIndexTask'; |
||
53 | /** |
||
54 | * Store the current states for all instances of SiteState |
||
55 | * |
||
56 | * @var array |
||
57 | */ |
||
58 | public $currentStates; |
||
59 | /** |
||
60 | * My name |
||
61 | * |
||
62 | * @var string |
||
63 | */ |
||
64 | protected $title = 'Solr Index update'; |
||
65 | /** |
||
66 | * What do I do? |
||
67 | * |
||
68 | * @var string |
||
69 | */ |
||
70 | protected $description = 'Add or update documents to an existing Solr core.'; |
||
71 | |||
72 | /** |
||
73 | * SolrIndexTask constructor. Sets up the document factory |
||
74 | * |
||
75 | * @throws ReflectionException |
||
76 | */ |
||
77 | 17 | public function __construct() |
|
78 | { |
||
79 | 17 | parent::__construct(); |
|
80 | // Only index live items. |
||
81 | // The old FTS module also indexed Draft items. This is unnecessary |
||
82 | // If versioned is needed, a separate Versioned Search module is required |
||
83 | 17 | Versioned::set_reading_mode(Versioned::DEFAULT_MODE); |
|
84 | 17 | $this->setService(Injector::inst()->get(SolrCoreService::class)); |
|
85 | 17 | $this->setLogger(Injector::inst()->get(LoggerInterface::class)); |
|
86 | 17 | $this->setDebug(Director::isDev() || Director::is_cli()); |
|
87 | 17 | $this->setBatchLength(DocumentFactory::config()->get('batchLength')); |
|
88 | 17 | $cores = SolrCoreService::config()->get('cpucores') ?: 1; |
|
89 | 17 | $this->setCores($cores); |
|
90 | 17 | $currentStates = SiteState::currentStates(); |
|
91 | 17 | SiteState::setDefaultStates($currentStates); |
|
92 | 17 | } |
|
93 | |||
94 | /** |
||
95 | * Implement this method in the task subclass to |
||
96 | * execute via the TaskRunner |
||
97 | * |
||
98 | * @param HTTPRequest $request Current request |
||
99 | * @return int|bool |
||
100 | * @throws Exception |
||
101 | * @throws HTTPException |
||
102 | */ |
||
103 | 14 | public function run($request) |
|
104 | { |
||
105 | 14 | $start = time(); |
|
106 | 14 | $this->getLogger()->info(date('Y-m-d H:i:s')); |
|
107 | 14 | [$vars, $group, $isGroup] = $this->taskSetup($request); |
|
108 | 14 | $groups = 0; |
|
109 | 14 | $indexes = $this->service->getValidIndexes($request->getVar('index')); |
|
110 | |||
111 | 14 | foreach ($indexes as $indexName) { |
|
112 | /** @var BaseIndex $index */ |
||
113 | 14 | $index = Injector::inst()->get($indexName, false); |
|
114 | 14 | $this->setIndex($index); |
|
115 | |||
116 | 14 | $indexClasses = $this->index->getClasses(); |
|
117 | 14 | $classes = $this->getClasses($vars, $indexClasses); |
|
118 | 14 | if (!count($classes)) { |
|
119 | 11 | continue; |
|
120 | } |
||
121 | |||
122 | 14 | $this->clearIndex($vars); |
|
123 | |||
124 | 14 | $groups = $this->indexClassForIndex($classes, $isGroup, $group); |
|
125 | } |
||
126 | |||
127 | 14 | $this->getLogger()->info(gmdate('Y-m-d H:i:s')); |
|
128 | 14 | $time = gmdate('H:i:s', (time() - $start)); |
|
129 | 14 | $this->getLogger()->info(sprintf('Time taken: %s', $time)); |
|
130 | |||
131 | 14 | return $groups; |
|
0 ignored issues
–
show
|
|||
132 | } |
||
133 | |||
134 | /** |
||
135 | * Set up the requirements for this task |
||
136 | * |
||
137 | * @param HTTPRequest $request Current request |
||
138 | * @return array |
||
139 | */ |
||
140 | 14 | protected function taskSetup(HTTPRequest $request): array |
|
141 | { |
||
142 | 14 | $vars = $request->getVars(); |
|
143 | 14 | $debug = $this->isDebug() || isset($vars['debug']); |
|
144 | // Forcefully set the debugging to whatever the outcome of the above is |
||
145 | 14 | $this->setDebug($debug, true); |
|
146 | 14 | $group = $vars['group'] ?? 0; |
|
147 | 14 | $start = $vars['start'] ?? 0; |
|
148 | 14 | $group = ($start > $group) ? $start : $group; |
|
149 | 14 | $isGroup = isset($vars['group']); |
|
150 | |||
151 | 14 | return [$vars, $group, $isGroup]; |
|
152 | } |
||
153 | |||
154 | /** |
||
155 | * get the classes to run for this task execution |
||
156 | * |
||
157 | * @param array $vars URL GET Parameters |
||
158 | * @param array $classes Classes to index |
||
159 | * @return bool|array |
||
160 | */ |
||
161 | 14 | protected function getClasses(array $vars, array $classes): array |
|
162 | { |
||
163 | 14 | if (isset($vars['class'])) { |
|
164 | 1 | return array_intersect($classes, [$vars['class']]); |
|
165 | } |
||
166 | |||
167 | 13 | return $classes; |
|
168 | } |
||
169 | |||
170 | /** |
||
171 | * Clear the given index if a full re-index is needed |
||
172 | * |
||
173 | * @param array $vars URL GET Parameters |
||
174 | * @throws Exception |
||
175 | */ |
||
176 | 14 | public function clearIndex(array $vars) |
|
177 | { |
||
178 | 14 | if (!empty($vars['clear'])) { |
|
179 | 1 | $this->getLogger()->info(sprintf('Clearing index %s', $this->index->getIndexName())); |
|
180 | 1 | $this->service->doManipulate(ArrayList::create([]), SolrCoreService::DELETE_TYPE_ALL, $this->index); |
|
181 | } |
||
182 | 14 | } |
|
183 | |||
184 | /** |
||
185 | * Index the classes for a specific index |
||
186 | * |
||
187 | * @param array $classes Classes that need indexing |
||
188 | * @param bool $isGroup Indexing a specific group? |
||
189 | * @param int $group Group to index |
||
190 | * @return int|bool |
||
191 | * @throws Exception |
||
192 | * @throws HTTPException |
||
193 | */ |
||
194 | 14 | protected function indexClassForIndex(array $classes, bool $isGroup, int $group) |
|
195 | { |
||
196 | 14 | $groups = 0; |
|
197 | 14 | foreach ($classes as $class) { |
|
198 | 14 | $groups = $this->indexClass($isGroup, $class, $group); |
|
199 | } |
||
200 | |||
201 | 14 | return $groups; |
|
202 | } |
||
203 | |||
204 | /** |
||
205 | * Index a single class for a given index. {@link static::indexClassForIndex()} |
||
206 | * |
||
207 | * @param bool $isGroup Is a specific group indexed |
||
208 | * @param string $class Class to index |
||
209 | * @param int $group Group to index |
||
210 | * @return int|bool |
||
211 | * @throws HTTPException |
||
212 | * @throws ValidationException |
||
213 | */ |
||
214 | 14 | private function indexClass(bool $isGroup, string $class, int $group) |
|
215 | { |
||
216 | 14 | $this->getLogger()->info(sprintf('Indexing %s for %s', $class, $this->getIndex()->getIndexName())); |
|
217 | 14 | [$totalGroups, $groups] = $this->getGroupSettings($isGroup, $class, $group); |
|
218 | 14 | $this->getLogger()->info(sprintf('Total groups %s', $totalGroups)); |
|
219 | do { |
||
220 | try { |
||
221 | 14 | if ($this->hasPCNTL()) { |
|
222 | // @codeCoverageIgnoreStart |
||
223 | $group = $this->spawnChildren($class, $group, $groups); |
||
224 | // @codeCoverageIgnoreEnd |
||
225 | } else { |
||
226 | 14 | $this->doReindex($group, $class); |
|
227 | } |
||
228 | 14 | $group++; |
|
229 | } catch (Exception $error) { |
||
230 | // @codeCoverageIgnoreStart |
||
231 | $this->logException($this->index->getIndexName(), $group, $error); |
||
232 | continue; |
||
233 | // @codeCoverageIgnoreEnd |
||
234 | } |
||
235 | 14 | } while ($group <= $groups); |
|
236 | |||
237 | 14 | return $totalGroups; |
|
238 | } |
||
239 | |||
240 | /** |
||
241 | * Check the amount of groups and the total against the isGroup check. |
||
242 | * |
||
243 | * @param bool $isGroup Is it a specific group |
||
244 | * @param string $class Class to check |
||
245 | * @param int $group Current group to index |
||
246 | * @return array |
||
247 | */ |
||
248 | 14 | private function getGroupSettings(bool $isGroup, string $class, int $group): array |
|
249 | { |
||
250 | 14 | $totalGroups = (int)ceil($class::get()->count() / $this->getBatchLength()); |
|
251 | 14 | $groups = $isGroup ? ($group + $this->getCores() - 1) : $totalGroups; |
|
252 | |||
253 | 14 | return [$totalGroups, $groups]; |
|
254 | } |
||
255 | |||
256 | /** |
||
257 | * Check if PCNTL is available and/or useable. |
||
258 | * The unittest param is from phpunit.xml.dist, meant to bypass the exit(0) call |
||
259 | * The pcntl parameter check is for unit tests, but PHPUnit does not support PCNTL (yet) |
||
260 | * |
||
261 | * @return bool |
||
262 | */ |
||
263 | 14 | private function hasPCNTL(): bool |
|
264 | { |
||
265 | 14 | return Director::is_cli() && |
|
266 | 14 | function_exists('pcntl_fork') && |
|
267 | (Controller::curr()->getRequest()->getVar('unittest') === 'pcntl' || |
||
268 | 14 | !Controller::curr()->getRequest()->getVar('unittest')); |
|
269 | } |
||
270 | |||
271 | /** |
||
272 | * For each core, spawn a child process that will handle a separate group. |
||
273 | * This speeds up indexing through CLI massively. |
||
274 | * |
||
275 | * @codeCoverageIgnore Can't be tested because PCNTL is not available |
||
276 | * @param string $class Class to index |
||
277 | * @param int $group Group to index |
||
278 | * @param int $groups Total amount of groups |
||
279 | * @return int Last group indexed |
||
280 | * @throws Exception |
||
281 | * @throws HTTPException |
||
282 | */ |
||
283 | private function spawnChildren(string $class, int $group, int $groups): int |
||
284 | { |
||
285 | $start = $group; |
||
286 | $pids = []; |
||
287 | $cores = $this->getCores(); |
||
288 | // for each core, start a grouped indexing |
||
289 | for ($i = 0; $i < $cores; $i++) { |
||
290 | $start = $group + $i; |
||
291 | if ($start < $groups) { |
||
292 | $this->runForkedChild($class, $pids, $start); |
||
293 | } |
||
294 | } |
||
295 | // Wait for each child to finish |
||
296 | // It needs to wait for them independently, |
||
297 | // or it runs out of memory for some reason |
||
298 | foreach ($pids as $pid) { |
||
299 | pcntl_waitpid($pid, $status); |
||
300 | } |
||
301 | $commit = $this->index->getClient()->createUpdate(); |
||
302 | $commit->addCommit(); |
||
303 | |||
304 | $this->index->getClient()->update($commit); |
||
305 | |||
306 | return $start; |
||
307 | } |
||
308 | |||
309 | /** |
||
310 | * Create a fork and run the child |
||
311 | * |
||
312 | * @codeCoverageIgnore Can't be tested because PCNTL is not available |
||
313 | * @param string $class Class to index |
||
314 | * @param array $pids Array of all the child Process IDs |
||
315 | * @param int $start Start point for the objects |
||
316 | * @return void |
||
317 | * @throws HTTPException |
||
318 | * @throws ValidationException |
||
319 | */ |
||
320 | private function runForkedChild(string $class, array &$pids, int $start): void |
||
321 | { |
||
322 | $pid = pcntl_fork(); |
||
323 | // PID needs to be pushed before anything else, for some reason |
||
324 | $pids[] = $pid; |
||
325 | $config = DB::getConfig(); |
||
326 | DB::connect($config); |
||
327 | $this->runChild($class, $pid, $start); |
||
328 | } |
||
329 | |||
330 | /** |
||
331 | * Ren a single child index operation |
||
332 | * |
||
333 | * @codeCoverageIgnore Can't be tested because PCNTL is not available |
||
334 | * @param string $class Class to index |
||
335 | * @param int $pid PID of the child |
||
336 | * @param int $start Position to start |
||
337 | * @throws HTTPException |
||
338 | * @throws ValidationException |
||
339 | * @throws Exception |
||
340 | */ |
||
341 | private function runChild(string $class, int $pid, int $start): void |
||
342 | { |
||
343 | if ($pid !== 0) { |
||
344 | return; |
||
345 | } |
||
346 | try { |
||
347 | $this->doReindex($start, $class, $pid); |
||
348 | } catch (Exception $error) { |
||
349 | SolrLogger::logMessage('ERROR', $error); |
||
350 | $msg = sprintf( |
||
351 | 'Something went wrong while indexing %s on %s, see the logs for details', |
||
352 | $start, |
||
353 | $this->index->getIndexName() |
||
354 | ); |
||
355 | throw new Exception($msg); |
||
356 | } |
||
357 | } |
||
358 | |||
359 | /** |
||
360 | * Reindex the given group, for each state |
||
361 | * |
||
362 | * @param int $group Group to index |
||
363 | * @param string $class Class to index |
||
364 | * @param bool|int $pid Are we a child process or not |
||
365 | * @throws Exception |
||
366 | */ |
||
367 | 14 | private function doReindex(int $group, string $class, $pid = false) |
|
368 | { |
||
369 | 14 | $start = time(); |
|
370 | 14 | $states = SiteState::getStates(); |
|
371 | 14 | foreach ($states as $state) { |
|
372 | 14 | if ($state !== SiteState::DEFAULT_STATE && !empty($state)) { |
|
373 | SiteState::withState($state); |
||
374 | } |
||
375 | 14 | $this->indexStateClass($group, $class); |
|
376 | } |
||
377 | |||
378 | 14 | SiteState::withState(SiteState::DEFAULT_STATE); |
|
379 | 14 | $end = gmdate('i:s', time() - $start); |
|
380 | 14 | $this->getLogger()->info(sprintf('Indexed group %s in %s', $group, $end)); |
|
381 | |||
382 | // @codeCoverageIgnoreStart |
||
383 | if ($pid !== false) { |
||
384 | exit(0); |
||
0 ignored issues
–
show
|
|||
385 | } |
||
386 | // @codeCoverageIgnoreEnd |
||
387 | 14 | } |
|
388 | |||
389 | /** |
||
390 | * Index a group of a class for a specific state and index |
||
391 | * |
||
392 | * @param string $group Group to index |
||
393 | * @param string $class Class to index |
||
394 | * @throws Exception |
||
395 | */ |
||
396 | 14 | private function indexStateClass(string $group, string $class): void |
|
397 | { |
||
398 | // Generate filtered list of local records |
||
399 | 14 | $baseClass = DataObject::getSchema()->baseDataClass($class); |
|
400 | /** @var DataList|DataObject[] $items */ |
||
401 | 14 | $items = DataObject::get($baseClass) |
|
402 | 14 | ->sort('ID ASC') |
|
403 | 14 | ->limit($this->getBatchLength(), ($group * $this->getBatchLength())); |
|
404 | 14 | if ($items->count()) { |
|
405 | 1 | $this->updateIndex($items); |
|
406 | } |
||
407 | 14 | } |
|
408 | |||
409 | /** |
||
410 | * Execute the update on the client |
||
411 | * |
||
412 | * @param SS_List $items Items to index |
||
413 | * @throws Exception |
||
414 | */ |
||
415 | 1 | private function updateIndex($items): void |
|
416 | { |
||
417 | 1 | $index = $this->getIndex(); |
|
418 | 1 | $client = $index->getClient(); |
|
419 | 1 | $update = $client->createUpdate(); |
|
420 | 1 | $this->service->setDebug($this->debug); |
|
421 | 1 | $this->service->updateIndex($index, $items, $update); |
|
422 | 1 | $client->update($update); |
|
423 | 1 | } |
|
424 | |||
425 | /** |
||
426 | * Log an exception if it happens. Most are catched, these logs are for the developers |
||
427 | * to identify problems and fix them. |
||
428 | * |
||
429 | * @codeCoverageIgnore This is actually tested through reflection |
||
430 | * @param string $index Index that is currently running |
||
431 | * @param int $group Group currently attempted to index |
||
432 | * @param Exception $exception Exception that's been thrown |
||
433 | * @throws HTTPException |
||
434 | * @throws ValidationException |
||
435 | */ |
||
436 | private function logException($index, int $group, Exception $exception): void |
||
437 | { |
||
438 | $this->getLogger()->error($exception->getMessage()); |
||
439 | $msg = sprintf( |
||
440 | 'Error indexing core %s on group %s,' . PHP_EOL . |
||
441 | 'Please log in to the CMS to find out more about Indexing errors' . PHP_EOL, |
||
442 | $index, |
||
443 | $group |
||
444 | ); |
||
445 | SolrLogger::logMessage('ERROR', $msg); |
||
446 | } |
||
447 | } |
||
448 |
In the issue above, the returned value is violating the contract defined by the mentioned interface.
Let's take a look at an example: