1 | <?php |
||
38 | class PopulateCommand extends Command |
||
39 | { |
||
40 | protected static $defaultName = 'fos:elastica:populate'; |
||
41 | |||
42 | /** |
||
43 | * @var EventDispatcherInterface |
||
44 | */ |
||
45 | private $dispatcher; |
||
46 | |||
47 | /** |
||
48 | * @var IndexManager |
||
49 | */ |
||
50 | private $indexManager; |
||
51 | |||
52 | /** |
||
53 | * @var PagerProviderRegistry |
||
54 | */ |
||
55 | private $pagerProviderRegistry; |
||
56 | |||
57 | /** |
||
58 | * @var PagerPersisterRegistry |
||
59 | */ |
||
60 | private $pagerPersisterRegistry; |
||
61 | |||
62 | /** |
||
63 | * @var PagerPersisterInterface |
||
64 | */ |
||
65 | private $pagerPersister; |
||
66 | |||
67 | /** |
||
68 | * @var Resetter |
||
69 | */ |
||
70 | private $resetter; |
||
71 | |||
72 | 4 | public function __construct( |
|
73 | EventDispatcherInterface $dispatcher, |
||
74 | IndexManager $indexManager, |
||
75 | PagerProviderRegistry $pagerProviderRegistry, |
||
76 | PagerPersisterRegistry $pagerPersisterRegistry, |
||
77 | Resetter $resetter |
||
78 | ) { |
||
79 | 4 | parent::__construct(); |
|
80 | |||
81 | 4 | $this->dispatcher = $dispatcher; |
|
82 | 4 | $this->indexManager = $indexManager; |
|
83 | 4 | $this->pagerProviderRegistry = $pagerProviderRegistry; |
|
84 | 4 | $this->pagerPersisterRegistry = $pagerPersisterRegistry; |
|
85 | 4 | $this->resetter = $resetter; |
|
86 | 4 | } |
|
87 | |||
88 | 4 | protected function configure() |
|
107 | |||
108 | protected function initialize(InputInterface $input, OutputInterface $output) |
||
119 | |||
120 | protected function execute(InputInterface $input, OutputInterface $output) |
||
153 | |||
154 | /** |
||
155 | * Recreates an index, populates it, and refreshes it. |
||
156 | */ |
||
157 | private function populateIndex(OutputInterface $output, string $index, bool $reset, $options): void |
||
158 | { |
||
159 | $this->dispatcher->dispatch($event = new PreIndexPopulateEvent($index, $reset, $options)); |
||
160 | |||
161 | if ($reset = $event->isReset()) { |
||
162 | $output->writeln(sprintf('<info>Resetting</info> <comment>%s</comment>', $index)); |
||
163 | $this->resetter->resetIndex($index, true); |
||
164 | } |
||
165 | |||
166 | $offset = 1 < $options['first_page'] ? ($options['first_page'] - 1) * $options['max_per_page'] : 0; |
||
167 | $loggerClosure = ProgressClosureBuilder::build($output, 'Populating', $index, $offset); |
||
168 | |||
169 | $this->dispatcher->addListener( |
||
170 | OnExceptionEvent::class, |
||
171 | function(OnExceptionEvent $event) use ($loggerClosure) { |
||
172 | $loggerClosure( |
||
173 | count($event->getObjects()), |
||
174 | $event->getPager()->getNbResults(), |
||
175 | sprintf('<error>%s</error>', $event->getException()->getMessage()) |
||
176 | ); |
||
177 | } |
||
178 | ); |
||
179 | |||
180 | $this->dispatcher->addListener( |
||
181 | PostInsertObjectsEvent::class, |
||
182 | function(PostInsertObjectsEvent $event) use ($loggerClosure) { |
||
183 | $loggerClosure(count($event->getObjects()), $event->getPager()->getNbResults()); |
||
184 | } |
||
185 | ); |
||
186 | |||
187 | $this->dispatcher->addListener( |
||
188 | PostAsyncInsertObjectsEvent::class, |
||
189 | function(PostAsyncInsertObjectsEvent $event) use ($loggerClosure) { |
||
190 | $loggerClosure($event->getObjectsCount(), $event->getPager()->getNbResults(), $event->getErrorMessage()); |
||
191 | } |
||
192 | ); |
||
193 | |||
194 | if ($options['ignore_errors']) { |
||
195 | $this->dispatcher->addListener( |
||
196 | OnExceptionEvent::class, |
||
197 | function(OnExceptionEvent $event) { |
||
198 | if ($event->getException() instanceof BulkResponseException) { |
||
199 | $event->setIgnore(true); |
||
200 | } |
||
201 | } |
||
202 | ); |
||
203 | } |
||
204 | |||
205 | $provider = $this->pagerProviderRegistry->getProvider($index); |
||
206 | $pager = $provider->provide($options); |
||
207 | |||
208 | $this->pagerPersister->insert($pager, array_merge($options, ['indexName' => $index])); |
||
209 | |||
210 | $this->dispatcher->dispatch(new PostIndexPopulateEvent($index, $reset, $options)); |
||
211 | |||
212 | $this->refreshIndex($output, $index); |
||
213 | } |
||
214 | |||
215 | /** |
||
216 | * Refreshes an index. |
||
217 | */ |
||
218 | private function refreshIndex(OutputInterface $output, string $index): void |
||
224 | } |
||
225 |