Firesphere /
silverstripe-solr-search
| 1 | <?php |
||
| 2 | /** |
||
| 3 | * Class SolrIndexTask|Firesphere\SolrSearch\Tasks\SolrIndexTask Index Solr cores |
||
| 4 | * |
||
| 5 | * @package Firesphere\Solr\Search |
||
| 6 | * @author Simon `Firesphere` Erkelens; Marco `Sheepy` Hermo |
||
| 7 | * @copyright Copyright (c) 2018 - now() Firesphere & Sheepy |
||
| 8 | */ |
||
| 9 | |||
| 10 | namespace Firesphere\SolrSearch\Tasks; |
||
| 11 | |||
| 12 | use Exception; |
||
| 13 | use Firesphere\SolrSearch\Factories\DocumentFactory; |
||
| 14 | use Firesphere\SolrSearch\Helpers\SolrLogger; |
||
| 15 | use Firesphere\SolrSearch\Indexes\BaseIndex; |
||
| 16 | use Firesphere\SolrSearch\Services\SolrCoreService; |
||
| 17 | use Firesphere\SolrSearch\States\SiteState; |
||
| 18 | use Firesphere\SolrSearch\Traits\LoggerTrait; |
||
| 19 | use Firesphere\SolrSearch\Traits\SolrIndexTrait; |
||
| 20 | use Psr\Log\LoggerInterface; |
||
| 21 | use ReflectionException; |
||
| 22 | use SilverStripe\Control\Controller; |
||
| 23 | use SilverStripe\Control\Director; |
||
| 24 | use SilverStripe\Control\HTTPRequest; |
||
| 25 | use SilverStripe\Core\Injector\Injector; |
||
| 26 | use SilverStripe\Dev\BuildTask; |
||
| 27 | use SilverStripe\ORM\ArrayList; |
||
| 28 | use SilverStripe\ORM\DataList; |
||
| 29 | use SilverStripe\ORM\DataObject; |
||
| 30 | use SilverStripe\ORM\DB; |
||
| 31 | use SilverStripe\ORM\SS_List; |
||
| 32 | use SilverStripe\ORM\ValidationException; |
||
| 33 | use SilverStripe\Versioned\Versioned; |
||
| 34 | use Solarium\Exception\HttpException; |
||
| 35 | |||
| 36 | /** |
||
| 37 | * Class SolrIndexTask |
||
| 38 | * |
||
| 39 | * @description Index items to Solr through a tasks |
||
| 40 | * @package Firesphere\Solr\Search |
||
| 41 | */ |
||
| 42 | class SolrIndexTask extends BuildTask |
||
| 43 | { |
||
| 44 | use LoggerTrait; |
||
| 45 | use SolrIndexTrait; |
||
| 46 | |||
| 47 | /** |
||
| 48 | * URLSegment of this task |
||
| 49 | * |
||
| 50 | * @var string |
||
| 51 | */ |
||
| 52 | private static $segment = 'SolrIndexTask'; |
||
| 53 | /** |
||
| 54 | * Store the current states for all instances of SiteState |
||
| 55 | * |
||
| 56 | * @var array |
||
| 57 | */ |
||
| 58 | public $currentStates; |
||
| 59 | /** |
||
| 60 | * My name |
||
| 61 | * |
||
| 62 | * @var string |
||
| 63 | */ |
||
| 64 | protected $title = 'Solr Index update'; |
||
| 65 | /** |
||
| 66 | * What do I do? |
||
| 67 | * |
||
| 68 | * @var string |
||
| 69 | */ |
||
| 70 | protected $description = 'Add or update documents to an existing Solr core.'; |
||
| 71 | |||
| 72 | /** |
||
| 73 | * SolrIndexTask constructor. Sets up the document factory |
||
| 74 | * |
||
| 75 | * @throws ReflectionException |
||
| 76 | */ |
||
| 77 | 17 | public function __construct() |
|
| 78 | { |
||
| 79 | 17 | parent::__construct(); |
|
| 80 | // Only index live items. |
||
| 81 | // The old FTS module also indexed Draft items. This is unnecessary |
||
| 82 | // If versioned is needed, a separate Versioned Search module is required |
||
| 83 | 17 | Versioned::set_reading_mode(Versioned::DEFAULT_MODE); |
|
| 84 | 17 | $this->setService(Injector::inst()->get(SolrCoreService::class)); |
|
| 85 | 17 | $this->setLogger(Injector::inst()->get(LoggerInterface::class)); |
|
| 86 | 17 | $this->setDebug(Director::isDev() || Director::is_cli()); |
|
| 87 | 17 | $this->setBatchLength(DocumentFactory::config()->get('batchLength')); |
|
| 88 | 17 | $cores = SolrCoreService::config()->get('cpucores') ?: 1; |
|
| 89 | 17 | $this->setCores($cores); |
|
| 90 | 17 | $currentStates = SiteState::currentStates(); |
|
| 91 | 17 | SiteState::setDefaultStates($currentStates); |
|
| 92 | 17 | } |
|
| 93 | |||
| 94 | /** |
||
| 95 | * Implement this method in the task subclass to |
||
| 96 | * execute via the TaskRunner |
||
| 97 | * |
||
| 98 | * @param HTTPRequest $request Current request |
||
| 99 | * @return int|bool |
||
| 100 | * @throws Exception |
||
| 101 | * @throws HTTPException |
||
| 102 | */ |
||
| 103 | 14 | public function run($request) |
|
| 104 | { |
||
| 105 | 14 | $start = time(); |
|
| 106 | 14 | $this->getLogger()->info(date('Y-m-d H:i:s')); |
|
| 107 | 14 | [$vars, $group, $isGroup] = $this->taskSetup($request); |
|
| 108 | 14 | $groups = 0; |
|
| 109 | 14 | $indexes = $this->service->getValidIndexes($request->getVar('index')); |
|
| 110 | |||
| 111 | 14 | foreach ($indexes as $indexName) { |
|
| 112 | /** @var BaseIndex $index */ |
||
| 113 | 14 | $index = Injector::inst()->get($indexName, false); |
|
| 114 | 14 | $this->setIndex($index); |
|
| 115 | |||
| 116 | 14 | $indexClasses = $this->index->getClasses(); |
|
| 117 | 14 | $classes = $this->getClasses($vars, $indexClasses); |
|
| 118 | 14 | if (!count($classes)) { |
|
| 119 | 11 | continue; |
|
| 120 | } |
||
| 121 | |||
| 122 | 14 | $this->clearIndex($vars); |
|
| 123 | |||
| 124 | 14 | $groups = $this->indexClassForIndex($classes, $isGroup, $group); |
|
| 125 | } |
||
| 126 | |||
| 127 | 14 | $this->getLogger()->info(gmdate('Y-m-d H:i:s')); |
|
| 128 | 14 | $time = gmdate('H:i:s', (time() - $start)); |
|
| 129 | 14 | $this->getLogger()->info(sprintf('Time taken: %s', $time)); |
|
| 130 | |||
| 131 | 14 | return $groups; |
|
|
0 ignored issues
–
show
|
|||
| 132 | } |
||
| 133 | |||
| 134 | /** |
||
| 135 | * Set up the requirements for this task |
||
| 136 | * |
||
| 137 | * @param HTTPRequest $request Current request |
||
| 138 | * @return array |
||
| 139 | */ |
||
| 140 | 14 | protected function taskSetup(HTTPRequest $request): array |
|
| 141 | { |
||
| 142 | 14 | $vars = $request->getVars(); |
|
| 143 | 14 | $debug = $this->isDebug() || isset($vars['debug']); |
|
| 144 | // Forcefully set the debugging to whatever the outcome of the above is |
||
| 145 | 14 | $this->setDebug($debug, true); |
|
| 146 | 14 | $group = $vars['group'] ?? 0; |
|
| 147 | 14 | $start = $vars['start'] ?? 0; |
|
| 148 | 14 | $group = ($start > $group) ? $start : $group; |
|
| 149 | 14 | $isGroup = isset($vars['group']); |
|
| 150 | |||
| 151 | 14 | return [$vars, $group, $isGroup]; |
|
| 152 | } |
||
| 153 | |||
| 154 | /** |
||
| 155 | * get the classes to run for this task execution |
||
| 156 | * |
||
| 157 | * @param array $vars URL GET Parameters |
||
| 158 | * @param array $classes Classes to index |
||
| 159 | * @return bool|array |
||
| 160 | */ |
||
| 161 | 14 | protected function getClasses(array $vars, array $classes): array |
|
| 162 | { |
||
| 163 | 14 | if (isset($vars['class'])) { |
|
| 164 | 1 | return array_intersect($classes, [$vars['class']]); |
|
| 165 | } |
||
| 166 | |||
| 167 | 13 | return $classes; |
|
| 168 | } |
||
| 169 | |||
| 170 | /** |
||
| 171 | * Clear the given index if a full re-index is needed |
||
| 172 | * |
||
| 173 | * @param array $vars URL GET Parameters |
||
| 174 | * @throws Exception |
||
| 175 | */ |
||
| 176 | 14 | public function clearIndex(array $vars) |
|
| 177 | { |
||
| 178 | 14 | if (!empty($vars['clear'])) { |
|
| 179 | 1 | $this->getLogger()->info(sprintf('Clearing index %s', $this->index->getIndexName())); |
|
| 180 | 1 | $this->service->doManipulate(ArrayList::create([]), SolrCoreService::DELETE_TYPE_ALL, $this->index); |
|
| 181 | } |
||
| 182 | 14 | } |
|
| 183 | |||
| 184 | /** |
||
| 185 | * Index the classes for a specific index |
||
| 186 | * |
||
| 187 | * @param array $classes Classes that need indexing |
||
| 188 | * @param bool $isGroup Indexing a specific group? |
||
| 189 | * @param int $group Group to index |
||
| 190 | * @return int|bool |
||
| 191 | * @throws Exception |
||
| 192 | * @throws HTTPException |
||
| 193 | */ |
||
| 194 | 14 | protected function indexClassForIndex(array $classes, bool $isGroup, int $group) |
|
| 195 | { |
||
| 196 | 14 | $groups = 0; |
|
| 197 | 14 | foreach ($classes as $class) { |
|
| 198 | 14 | $groups = $this->indexClass($isGroup, $class, $group); |
|
| 199 | } |
||
| 200 | |||
| 201 | 14 | return $groups; |
|
| 202 | } |
||
| 203 | |||
| 204 | /** |
||
| 205 | * Index a single class for a given index. {@link static::indexClassForIndex()} |
||
| 206 | * |
||
| 207 | * @param bool $isGroup Is a specific group indexed |
||
| 208 | * @param string $class Class to index |
||
| 209 | * @param int $group Group to index |
||
| 210 | * @return int|bool |
||
| 211 | * @throws HTTPException |
||
| 212 | * @throws ValidationException |
||
| 213 | */ |
||
| 214 | 14 | private function indexClass(bool $isGroup, string $class, int $group) |
|
| 215 | { |
||
| 216 | 14 | $this->getLogger()->info(sprintf('Indexing %s for %s', $class, $this->getIndex()->getIndexName())); |
|
| 217 | 14 | [$totalGroups, $groups] = $this->getGroupSettings($isGroup, $class, $group); |
|
| 218 | 14 | $this->getLogger()->info(sprintf('Total groups %s', $totalGroups)); |
|
| 219 | do { |
||
| 220 | try { |
||
| 221 | 14 | if ($this->hasPCNTL()) { |
|
| 222 | // @codeCoverageIgnoreStart |
||
| 223 | $group = $this->spawnChildren($class, $group, $groups); |
||
| 224 | // @codeCoverageIgnoreEnd |
||
| 225 | } else { |
||
| 226 | 14 | $this->doReindex($group, $class); |
|
| 227 | } |
||
| 228 | 14 | $group++; |
|
| 229 | } catch (Exception $error) { |
||
| 230 | // @codeCoverageIgnoreStart |
||
| 231 | $this->logException($this->index->getIndexName(), $group, $error); |
||
| 232 | continue; |
||
| 233 | // @codeCoverageIgnoreEnd |
||
| 234 | } |
||
| 235 | 14 | } while ($group <= $groups); |
|
| 236 | |||
| 237 | 14 | return $totalGroups; |
|
| 238 | } |
||
| 239 | |||
| 240 | /** |
||
| 241 | * Check the amount of groups and the total against the isGroup check. |
||
| 242 | * |
||
| 243 | * @param bool $isGroup Is it a specific group |
||
| 244 | * @param string $class Class to check |
||
| 245 | * @param int $group Current group to index |
||
| 246 | * @return array |
||
| 247 | */ |
||
| 248 | 14 | private function getGroupSettings(bool $isGroup, string $class, int $group): array |
|
| 249 | { |
||
| 250 | 14 | $totalGroups = (int)ceil($class::get()->count() / $this->getBatchLength()); |
|
| 251 | 14 | $groups = $isGroup ? ($group + $this->getCores() - 1) : $totalGroups; |
|
| 252 | |||
| 253 | 14 | return [$totalGroups, $groups]; |
|
| 254 | } |
||
| 255 | |||
| 256 | /** |
||
| 257 | * Check if PCNTL is available and/or useable. |
||
| 258 | * The unittest param is from phpunit.xml.dist, meant to bypass the exit(0) call |
||
| 259 | * The pcntl parameter check is for unit tests, but PHPUnit does not support PCNTL (yet) |
||
| 260 | * |
||
| 261 | * @return bool |
||
| 262 | */ |
||
| 263 | 14 | private function hasPCNTL(): bool |
|
| 264 | { |
||
| 265 | 14 | return Director::is_cli() && |
|
| 266 | 14 | function_exists('pcntl_fork') && |
|
| 267 | (Controller::curr()->getRequest()->getVar('unittest') === 'pcntl' || |
||
| 268 | 14 | !Controller::curr()->getRequest()->getVar('unittest')); |
|
| 269 | } |
||
| 270 | |||
| 271 | /** |
||
| 272 | * For each core, spawn a child process that will handle a separate group. |
||
| 273 | * This speeds up indexing through CLI massively. |
||
| 274 | * |
||
| 275 | * @codeCoverageIgnore Can't be tested because PCNTL is not available |
||
| 276 | * @param string $class Class to index |
||
| 277 | * @param int $group Group to index |
||
| 278 | * @param int $groups Total amount of groups |
||
| 279 | * @return int Last group indexed |
||
| 280 | * @throws Exception |
||
| 281 | * @throws HTTPException |
||
| 282 | */ |
||
| 283 | private function spawnChildren(string $class, int $group, int $groups): int |
||
| 284 | { |
||
| 285 | $start = $group; |
||
| 286 | $pids = []; |
||
| 287 | $cores = $this->getCores(); |
||
| 288 | // for each core, start a grouped indexing |
||
| 289 | for ($i = 0; $i < $cores; $i++) { |
||
| 290 | $start = $group + $i; |
||
| 291 | if ($start < $groups) { |
||
| 292 | $this->runForkedChild($class, $pids, $start); |
||
| 293 | } |
||
| 294 | } |
||
| 295 | // Wait for each child to finish |
||
| 296 | // It needs to wait for them independently, |
||
| 297 | // or it runs out of memory for some reason |
||
| 298 | foreach ($pids as $pid) { |
||
| 299 | pcntl_waitpid($pid, $status); |
||
| 300 | } |
||
| 301 | $commit = $this->index->getClient()->createUpdate(); |
||
| 302 | $commit->addCommit(); |
||
| 303 | |||
| 304 | $this->index->getClient()->update($commit); |
||
| 305 | |||
| 306 | return $start; |
||
| 307 | } |
||
| 308 | |||
| 309 | /** |
||
| 310 | * Create a fork and run the child |
||
| 311 | * |
||
| 312 | * @codeCoverageIgnore Can't be tested because PCNTL is not available |
||
| 313 | * @param string $class Class to index |
||
| 314 | * @param array $pids Array of all the child Process IDs |
||
| 315 | * @param int $start Start point for the objects |
||
| 316 | * @return void |
||
| 317 | * @throws HTTPException |
||
| 318 | * @throws ValidationException |
||
| 319 | */ |
||
| 320 | private function runForkedChild(string $class, array &$pids, int $start): void |
||
| 321 | { |
||
| 322 | $pid = pcntl_fork(); |
||
| 323 | // PID needs to be pushed before anything else, for some reason |
||
| 324 | $pids[] = $pid; |
||
| 325 | $config = DB::getConfig(); |
||
| 326 | DB::connect($config); |
||
| 327 | $this->runChild($class, $pid, $start); |
||
| 328 | } |
||
| 329 | |||
| 330 | /** |
||
| 331 | * Ren a single child index operation |
||
| 332 | * |
||
| 333 | * @codeCoverageIgnore Can't be tested because PCNTL is not available |
||
| 334 | * @param string $class Class to index |
||
| 335 | * @param int $pid PID of the child |
||
| 336 | * @param int $start Position to start |
||
| 337 | * @throws HTTPException |
||
| 338 | * @throws ValidationException |
||
| 339 | * @throws Exception |
||
| 340 | */ |
||
| 341 | private function runChild(string $class, int $pid, int $start): void |
||
| 342 | { |
||
| 343 | if ($pid !== 0) { |
||
| 344 | return; |
||
| 345 | } |
||
| 346 | try { |
||
| 347 | $this->doReindex($start, $class, $pid); |
||
| 348 | } catch (Exception $error) { |
||
| 349 | SolrLogger::logMessage('ERROR', $error); |
||
| 350 | $msg = sprintf( |
||
| 351 | 'Something went wrong while indexing %s on %s, see the logs for details', |
||
| 352 | $start, |
||
| 353 | $this->index->getIndexName() |
||
| 354 | ); |
||
| 355 | throw new Exception($msg); |
||
| 356 | } |
||
| 357 | } |
||
| 358 | |||
| 359 | /** |
||
| 360 | * Reindex the given group, for each state |
||
| 361 | * |
||
| 362 | * @param int $group Group to index |
||
| 363 | * @param string $class Class to index |
||
| 364 | * @param bool|int $pid Are we a child process or not |
||
| 365 | * @throws Exception |
||
| 366 | */ |
||
| 367 | 14 | private function doReindex(int $group, string $class, $pid = false) |
|
| 368 | { |
||
| 369 | 14 | $start = time(); |
|
| 370 | 14 | $states = SiteState::getStates(); |
|
| 371 | 14 | foreach ($states as $state) { |
|
| 372 | 14 | if ($state !== SiteState::DEFAULT_STATE && !empty($state)) { |
|
| 373 | SiteState::withState($state); |
||
| 374 | } |
||
| 375 | 14 | $this->indexStateClass($group, $class); |
|
| 376 | } |
||
| 377 | |||
| 378 | 14 | SiteState::withState(SiteState::DEFAULT_STATE); |
|
| 379 | 14 | $end = gmdate('i:s', time() - $start); |
|
| 380 | 14 | $this->getLogger()->info(sprintf('Indexed group %s in %s', $group, $end)); |
|
| 381 | |||
| 382 | // @codeCoverageIgnoreStart |
||
| 383 | if ($pid !== false) { |
||
| 384 | exit(0); |
||
|
0 ignored issues
–
show
|
|||
| 385 | } |
||
| 386 | // @codeCoverageIgnoreEnd |
||
| 387 | 14 | } |
|
| 388 | |||
| 389 | /** |
||
| 390 | * Index a group of a class for a specific state and index |
||
| 391 | * |
||
| 392 | * @param string $group Group to index |
||
| 393 | * @param string $class Class to index |
||
| 394 | * @throws Exception |
||
| 395 | */ |
||
| 396 | 14 | private function indexStateClass(string $group, string $class): void |
|
| 397 | { |
||
| 398 | // Generate filtered list of local records |
||
| 399 | 14 | $baseClass = DataObject::getSchema()->baseDataClass($class); |
|
| 400 | /** @var DataList|DataObject[] $items */ |
||
| 401 | 14 | $items = DataObject::get($baseClass) |
|
| 402 | 14 | ->sort('ID ASC') |
|
| 403 | 14 | ->limit($this->getBatchLength(), ($group * $this->getBatchLength())); |
|
| 404 | 14 | if ($items->count()) { |
|
| 405 | 1 | $this->updateIndex($items); |
|
| 406 | } |
||
| 407 | 14 | } |
|
| 408 | |||
| 409 | /** |
||
| 410 | * Execute the update on the client |
||
| 411 | * |
||
| 412 | * @param SS_List $items Items to index |
||
| 413 | * @throws Exception |
||
| 414 | */ |
||
| 415 | 1 | private function updateIndex($items): void |
|
| 416 | { |
||
| 417 | 1 | $index = $this->getIndex(); |
|
| 418 | 1 | $client = $index->getClient(); |
|
| 419 | 1 | $update = $client->createUpdate(); |
|
| 420 | 1 | $this->service->setDebug($this->debug); |
|
| 421 | 1 | $this->service->updateIndex($index, $items, $update); |
|
| 422 | 1 | $client->update($update); |
|
| 423 | 1 | } |
|
| 424 | |||
| 425 | /** |
||
| 426 | * Log an exception if it happens. Most are catched, these logs are for the developers |
||
| 427 | * to identify problems and fix them. |
||
| 428 | * |
||
| 429 | * @codeCoverageIgnore This is actually tested through reflection |
||
| 430 | * @param string $index Index that is currently running |
||
| 431 | * @param int $group Group currently attempted to index |
||
| 432 | * @param Exception $exception Exception that's been thrown |
||
| 433 | * @throws HTTPException |
||
| 434 | * @throws ValidationException |
||
| 435 | */ |
||
| 436 | private function logException($index, int $group, Exception $exception): void |
||
| 437 | { |
||
| 438 | $this->getLogger()->error($exception->getMessage()); |
||
| 439 | $msg = sprintf( |
||
| 440 | 'Error indexing core %s on group %s,' . PHP_EOL . |
||
| 441 | 'Please log in to the CMS to find out more about Indexing errors' . PHP_EOL, |
||
| 442 | $index, |
||
| 443 | $group |
||
| 444 | ); |
||
| 445 | SolrLogger::logMessage('ERROR', $msg); |
||
| 446 | } |
||
| 447 | } |
||
| 448 |
In the issue above, the returned value is violating the contract defined by the mentioned interface.
Let's take a look at an example: