Completed
Pull Request — master (#1566)
by
unknown
04:24
created

PopulateCommand::initialize()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 11

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 6

Importance

Changes 0
Metric Value
dl 0
loc 11
ccs 0
cts 8
cp 0
rs 9.9
c 0
b 0
f 0
cc 2
nc 2
nop 2
crap 6
1
<?php
2
3
/*
4
 * This file is part of the FOSElasticaBundle package.
5
 *
6
 * (c) FriendsOfSymfony <http://friendsofsymfony.github.com/>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 */
11
12
namespace FOS\ElasticaBundle\Command;
13
14
use Elastica\Exception\Bulk\ResponseException as BulkResponseException;
15
use FOS\ElasticaBundle\Event\IndexPopulateEvent;
16
use FOS\ElasticaBundle\Event\TypePopulateEvent;
17
use FOS\ElasticaBundle\Index\IndexManager;
18
use FOS\ElasticaBundle\Index\Resetter;
19
use FOS\ElasticaBundle\Persister\Event\Events;
20
use FOS\ElasticaBundle\Persister\Event\OnExceptionEvent;
21
use FOS\ElasticaBundle\Persister\Event\PostAsyncInsertObjectsEvent;
22
use FOS\ElasticaBundle\Persister\Event\PostInsertObjectsEvent;
23
use FOS\ElasticaBundle\Persister\InPlacePagerPersister;
24
use FOS\ElasticaBundle\Persister\PagerPersisterInterface;
25
use FOS\ElasticaBundle\Persister\PagerPersisterRegistry;
26
use FOS\ElasticaBundle\Provider\PagerProviderRegistry;
27
use Symfony\Component\Console\Command\Command;
28
use Symfony\Component\Console\Helper\ProgressBar;
29
use Symfony\Component\Console\Helper\QuestionHelper;
30
use Symfony\Component\Console\Input\InputInterface;
31
use Symfony\Component\Console\Input\InputOption;
32
use Symfony\Component\Console\Output\OutputInterface;
33
use Symfony\Component\Console\Question\Question;
34
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
35
use Symfony\Component\EventDispatcher\LegacyEventDispatcherProxy;
36
37
/**
38
 * Populate the search index.
39
 */
40
class PopulateCommand extends Command
41
{
42
    protected static $defaultName = 'fos:elastica:populate';
43
44
    /**
45
     * @var EventDispatcherInterface
46
     */
47
    private $dispatcher;
48
49
    /**
50
     * @var IndexManager
51
     */
52
    private $indexManager;
53
54
    /**
55
     * @var PagerProviderRegistry
56
     */
57
    private $pagerProviderRegistry;
58
59
    /**
60
     * @var PagerPersisterRegistry
61
     */
62
    private $pagerPersisterRegistry;
63
64
    /**
65
     * @var PagerPersisterInterface
66
     */
67
    private $pagerPersister;
68
69
    /**
70
     * @var Resetter
71
     */
72
    private $resetter;
73
74 4
    public function __construct(
75
        EventDispatcherInterface $dispatcher,
76
        IndexManager $indexManager,
77
        PagerProviderRegistry $pagerProviderRegistry,
78
        PagerPersisterRegistry $pagerPersisterRegistry,
79
        Resetter $resetter
80
    ) {
81 4
        parent::__construct();
82
83 4
        $this->dispatcher = LegacyEventDispatcherProxy::decorate($dispatcher);
84 4
        $this->indexManager = $indexManager;
85 4
        $this->pagerProviderRegistry = $pagerProviderRegistry;
86 4
        $this->pagerPersisterRegistry = $pagerPersisterRegistry;
87 4
        $this->resetter = $resetter;
88 4
    }
89
90 4
    protected function configure()
91
    {
92
        $this
93 4
            ->setName('fos:elastica:populate')
94 4
            ->addOption('index', null, InputOption::VALUE_OPTIONAL, 'The index to repopulate')
95 4
            ->addOption('type', null, InputOption::VALUE_OPTIONAL, 'The type to repopulate')
96 4
            ->addOption('no-reset', null, InputOption::VALUE_NONE, 'Do not reset index before populating')
97 4
            ->addOption('no-delete', null, InputOption::VALUE_NONE, 'Do not delete index after populate')
98 4
            ->addOption('sleep', null, InputOption::VALUE_REQUIRED, 'Sleep time between persisting iterations (microseconds)', 0)
99 4
            ->addOption('ignore-errors', null, InputOption::VALUE_NONE, 'Do not stop on errors')
100 4
            ->addOption('no-overwrite-format', null, InputOption::VALUE_NONE, 'Prevent this command from overwriting ProgressBar\'s formats')
101
102 4
            ->addOption('first-page', null, InputOption::VALUE_REQUIRED, 'The pager\'s page to start population from. Including the given page.', 1)
103 4
            ->addOption('last-page', null, InputOption::VALUE_REQUIRED, 'The pager\'s page to end population on. Including the given page.', null)
104 4
            ->addOption('max-per-page', null, InputOption::VALUE_REQUIRED, 'The pager\'s page size', 100)
105 4
            ->addOption('pager-persister', null, InputOption::VALUE_REQUIRED, 'The pager persister to be used to populate the index', InPlacePagerPersister::NAME)
106
107 4
            ->setDescription('Populates search indexes from providers')
108
        ;
109 4
    }
110
111
    protected function initialize(InputInterface $input, OutputInterface $output)
112
    {
113
        $this->pagerPersister = $this->pagerPersisterRegistry->getPagerPersister($input->getOption('pager-persister'));
114
115
        if (!$input->getOption('no-overwrite-format')) {
116
            ProgressBar::setFormatDefinition('normal', " %current%/%max% [%bar%] %percent:3s%%\n%message%");
117
            ProgressBar::setFormatDefinition('verbose', " %current%/%max% [%bar%] %percent:3s%% %elapsed:6s%\n%message%");
118
            ProgressBar::setFormatDefinition('very_verbose', " %current%/%max% [%bar%] %percent:3s%% %elapsed:6s%/%estimated:-6s%\n%message%");
119
            ProgressBar::setFormatDefinition('debug', " %current%/%max% [%bar%] %percent:3s%% %elapsed:6s%/%estimated:-6s% %memory:6s%\n%message%");
120
        }
121
    }
122
123
    protected function execute(InputInterface $input, OutputInterface $output)
124
    {
125
        $index = $input->getOption('index');
126
        $type = $input->getOption('type');
127
        $reset = !$input->getOption('no-reset');
128
        $delete = !$input->getOption('no-delete');
129
130
        $options = [
131
            'delete' => $delete,
132
            'reset' => $reset,
133
            'ignore_errors' => $input->getOption('ignore-errors'),
134
            'sleep' => $input->getOption('sleep'),
135
            'first_page' => $input->getOption('first-page'),
136
            'max_per_page' => $input->getOption('max-per-page'),
137
        ];
138
139
        if ($input->getOption('last-page')) {
140
            $options['last_page'] = $input->getOption('last-page');
141
        }
142
143
        if ($input->isInteractive() && $reset && 1 < $options['first_page']) {
144
            /** @var QuestionHelper $dialog */
145
            $dialog = $this->getHelperSet()->get('question');
146
            if (!$dialog->ask($input, $output, new Question('<question>You chose to reset the index and start indexing with an offset. Do you really want to do that?</question>'))) {
147
                return;
148
            }
149
        }
150
151
        if (null === $index && null !== $type) {
152
            throw new \InvalidArgumentException('Cannot specify type option without an index.');
153
        }
154
155
        if (null !== $index) {
156
            if (null !== $type) {
157
                $this->populateIndexType($output, $index, $type, $reset, $options);
158
            } else {
159
                $this->populateIndex($output, $index, $reset, $options);
160
            }
161
        } else {
162
            $indexes = array_keys($this->indexManager->getAllIndexes());
163
164
            foreach ($indexes as $index) {
165
                $this->populateIndex($output, $index, $reset, $options);
166
            }
167
        }
168
    }
169
170
    /**
171
     * Recreates an index, populates its types, and refreshes the index.
172
     *
173
     * @param OutputInterface $output
174
     * @param string          $index
175
     * @param bool            $reset
176
     * @param array           $options
177
     */
178
    private function populateIndex(OutputInterface $output, $index, $reset, $options)
179
    {
180
        $event = new IndexPopulateEvent($index, $reset, $options);
181
        $this->dispatcher->dispatch($event, IndexPopulateEvent::PRE_INDEX_POPULATE);
182
183
        if ($event->isReset()) {
184
            $output->writeln(sprintf('<info>Resetting</info> <comment>%s</comment>', $index));
185
            $this->resetter->resetIndex($index, true);
186
        }
187
188
        $types = array_keys($this->pagerProviderRegistry->getIndexProviders($index));
189
        foreach ($types as $type) {
190
            $this->populateIndexType($output, $index, $type, false, $event->getOptions());
191
        }
192
193
        $this->dispatcher->dispatch($event, IndexPopulateEvent::POST_INDEX_POPULATE);
194
195
        $this->refreshIndex($output, $index);
196
    }
197
198
    /**
199
     * Deletes/remaps an index type, populates it, and refreshes the index.
200
     *
201
     * @param OutputInterface $output
202
     * @param string          $index
203
     * @param string          $type
204
     * @param bool            $reset
205
     * @param array           $options
206
     */
207
    private function populateIndexType(OutputInterface $output, $index, $type, $reset, $options)
208
    {
209
        $event = new TypePopulateEvent($index, $type, $reset, $options);
210
        $this->dispatcher->dispatch($event, TypePopulateEvent::PRE_TYPE_POPULATE);
211
212
        if ($event->isReset()) {
213
            $output->writeln(sprintf('<info>Resetting</info> <comment>%s/%s</comment>', $index, $type));
214
            $this->resetter->resetIndexType($index, $type);
215
        }
216
217
        $offset = 1 < $options['first_page'] ? ($options['first_page'] - 1) * $options['max_per_page'] : 0;
218
        $loggerClosure = ProgressClosureBuilder::build($output, 'Populating', $index, $type, $offset);
219
220
        $this->dispatcher->addListener(
221
            Events::ON_EXCEPTION,
222
            function(OnExceptionEvent $event) use ($loggerClosure) {
223
                $loggerClosure(
224
                    count($event->getObjects()),
225
                    $event->getPager()->getNbResults(),
226
                    sprintf('<error>%s</error>', $event->getException()->getMessage())
227
                );
228
            }
229
        );
230
231
        $this->dispatcher->addListener(
232
            Events::POST_INSERT_OBJECTS,
233
            function(PostInsertObjectsEvent $event) use ($loggerClosure) {
234
                $loggerClosure(count($event->getObjects()), $event->getPager()->getNbResults());
235
            }
236
        );
237
238
        $this->dispatcher->addListener(
239
            Events::POST_ASYNC_INSERT_OBJECTS,
240
            function(PostAsyncInsertObjectsEvent $event) use ($loggerClosure) {
241
                $loggerClosure($event->getObjectsCount(), $event->getPager()->getNbResults(), $event->getErrorMessage());
242
            }
243
        );
244
245
        if ($options['ignore_errors']) {
246
            $this->dispatcher->addListener(Events::ON_EXCEPTION, function(OnExceptionEvent $event) {
247
                if ($event->getException() instanceof BulkResponseException) {
248
                    $event->setIgnore(true);
249
                }
250
            });
251
        }
252
253
        $provider = $this->pagerProviderRegistry->getProvider($index, $type);
254
255
        $pager = $provider->provide($options);
256
257
        $options['indexName'] = $index;
258
        $options['typeName'] = $type;
259
260
        $this->pagerPersister->insert($pager, $options);
261
262
        $this->dispatcher->dispatch($event, TypePopulateEvent::POST_TYPE_POPULATE);
263
264
        $this->refreshIndex($output, $index);
265
    }
266
267
    /**
268
     * Refreshes an index.
269
     *
270
     * @param OutputInterface $output
271
     * @param string          $index
272
     */
273
    private function refreshIndex(OutputInterface $output, $index)
274
    {
275
        $output->writeln(sprintf('<info>Refreshing</info> <comment>%s</comment>', $index));
276
        $this->indexManager->getIndex($index)->refresh();
277
        $output->writeln("");
278
    }
279
}
280