Completed
Push — master ( 8b30ba...c06433 )
by Karel
04:39
created

PopulateCommand::initialize()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 11

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 6

Importance

Changes 0
Metric Value
dl 0
loc 11
ccs 0
cts 8
cp 0
rs 9.9
c 0
b 0
f 0
cc 2
nc 2
nop 2
crap 6
1
<?php
2
3
/*
4
 * This file is part of the FOSElasticaBundle package.
5
 *
6
 * (c) FriendsOfSymfony <http://friendsofsymfony.github.com/>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 */
11
12
namespace FOS\ElasticaBundle\Command;
13
14
use Elastica\Exception\Bulk\ResponseException as BulkResponseException;
15
use FOS\ElasticaBundle\Event\IndexPopulateEvent;
16
use FOS\ElasticaBundle\Event\TypePopulateEvent;
17
use FOS\ElasticaBundle\Index\IndexManager;
18
use FOS\ElasticaBundle\Index\Resetter;
19
use FOS\ElasticaBundle\Persister\Event\Events;
20
use FOS\ElasticaBundle\Persister\Event\OnExceptionEvent;
21
use FOS\ElasticaBundle\Persister\Event\PostAsyncInsertObjectsEvent;
22
use FOS\ElasticaBundle\Persister\Event\PostInsertObjectsEvent;
23
use FOS\ElasticaBundle\Persister\InPlacePagerPersister;
24
use FOS\ElasticaBundle\Persister\PagerPersisterInterface;
25
use FOS\ElasticaBundle\Persister\PagerPersisterRegistry;
26
use FOS\ElasticaBundle\Provider\PagerProviderRegistry;
27
use Symfony\Component\Console\Command\Command;
28
use Symfony\Component\Console\Helper\ProgressBar;
29
use Symfony\Component\Console\Helper\QuestionHelper;
30
use Symfony\Component\Console\Input\InputInterface;
31
use Symfony\Component\Console\Input\InputOption;
32
use Symfony\Component\Console\Output\OutputInterface;
33
use Symfony\Component\Console\Question\Question;
34
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
35
36
/**
37
 * Populate the search index.
38
 */
39
class PopulateCommand extends Command
40
{
41
    protected static $defaultName = 'fos:elastica:populate';
42
43
    /**
44
     * @var EventDispatcherInterface 
45
     */
46
    private $dispatcher;
47
48
    /**
49
     * @var IndexManager 
50
     */
51
    private $indexManager;
52
53
    /**
54
     * @var PagerProviderRegistry
55
     */
56
    private $pagerProviderRegistry;
57
58
    /**
59
     * @var PagerPersisterRegistry
60
     */
61
    private $pagerPersisterRegistry;
62
63
    /**
64
     * @var PagerPersisterInterface
65
     */
66
    private $pagerPersister;
67
68
    /**
69
     * @var Resetter
70
     */
71
    private $resetter;
72
73 4
    public function __construct(
74
        EventDispatcherInterface $dispatcher,
75
        IndexManager $indexManager,
76
        PagerProviderRegistry $pagerProviderRegistry,
77
        PagerPersisterRegistry $pagerPersisterRegistry,
78
        Resetter $resetter
79
    ) {
80 4
        parent::__construct();
81
82 4
        $this->dispatcher = $dispatcher;
83 4
        $this->indexManager = $indexManager;
84 4
        $this->pagerProviderRegistry = $pagerProviderRegistry;
85 4
        $this->pagerPersisterRegistry = $pagerPersisterRegistry;
86 4
        $this->resetter = $resetter;
87 4
    }
88
89 4
    protected function configure()
90
    {
91
        $this
92 4
            ->setName('fos:elastica:populate')
93 4
            ->addOption('index', null, InputOption::VALUE_OPTIONAL, 'The index to repopulate')
94 4
            ->addOption('type', null, InputOption::VALUE_OPTIONAL, 'The type to repopulate')
95 4
            ->addOption('no-reset', null, InputOption::VALUE_NONE, 'Do not reset index before populating')
96 4
            ->addOption('no-delete', null, InputOption::VALUE_NONE, 'Do not delete index after populate')
97 4
            ->addOption('sleep', null, InputOption::VALUE_REQUIRED, 'Sleep time between persisting iterations (microseconds)', 0)
98 4
            ->addOption('ignore-errors', null, InputOption::VALUE_NONE, 'Do not stop on errors')
99 4
            ->addOption('no-overwrite-format', null, InputOption::VALUE_NONE, 'Prevent this command from overwriting ProgressBar\'s formats')
100
101 4
            ->addOption('first-page', null, InputOption::VALUE_REQUIRED, 'The pager\'s page to start population from. Including the given page.', 1)
102 4
            ->addOption('last-page', null, InputOption::VALUE_REQUIRED, 'The pager\'s page to end population on. Including the given page.', null)
103 4
            ->addOption('max-per-page', null, InputOption::VALUE_REQUIRED, 'The pager\'s page size', 100)
104 4
            ->addOption('pager-persister', null, InputOption::VALUE_REQUIRED, 'The pager persister to be used to populate the index', InPlacePagerPersister::NAME)
105
106 4
            ->setDescription('Populates search indexes from providers')
107
        ;
108 4
    }
109
110
    protected function initialize(InputInterface $input, OutputInterface $output)
111
    {
112
        $this->pagerPersister = $this->pagerPersisterRegistry->getPagerPersister($input->getOption('pager-persister'));
113
114
        if (!$input->getOption('no-overwrite-format')) {
115
            ProgressBar::setFormatDefinition('normal', " %current%/%max% [%bar%] %percent:3s%%\n%message%");
116
            ProgressBar::setFormatDefinition('verbose', " %current%/%max% [%bar%] %percent:3s%% %elapsed:6s%\n%message%");
117
            ProgressBar::setFormatDefinition('very_verbose', " %current%/%max% [%bar%] %percent:3s%% %elapsed:6s%/%estimated:-6s%\n%message%");
118
            ProgressBar::setFormatDefinition('debug', " %current%/%max% [%bar%] %percent:3s%% %elapsed:6s%/%estimated:-6s% %memory:6s%\n%message%");
119
        }
120
    }
121
122
    protected function execute(InputInterface $input, OutputInterface $output)
123
    {
124
        $index = $input->getOption('index');
125
        $type = $input->getOption('type');
126
        $reset = !$input->getOption('no-reset');
127
        $delete = !$input->getOption('no-delete');
128
129
        $options = [
130
            'delete' => $delete,
131
            'reset' => $reset,
132
            'ignore_errors' => $input->getOption('ignore-errors'),
133
            'sleep' => $input->getOption('sleep'),
134
            'first_page' => $input->getOption('first-page'),
135
            'max_per_page' => $input->getOption('max-per-page'),
136
        ];
137
138
        if ($input->getOption('last-page')) {
139
            $options['last_page'] = $input->getOption('last-page');
140
        }
141
142
        if ($input->isInteractive() && $reset && 1 < $options['first_page']) {
143
            /** @var QuestionHelper $dialog */
144
            $dialog = $this->getHelperSet()->get('question');
145
            if (!$dialog->ask($input, $output, new Question('<question>You chose to reset the index and start indexing with an offset. Do you really want to do that?</question>'))) {
146
                return;
147
            }
148
        }
149
150
        if (null === $index && null !== $type) {
151
            throw new \InvalidArgumentException('Cannot specify type option without an index.');
152
        }
153
154
        if (null !== $index) {
155
            if (null !== $type) {
156
                $this->populateIndexType($output, $index, $type, $reset, $options);
157
            } else {
158
                $this->populateIndex($output, $index, $reset, $options);
159
            }
160
        } else {
161
            $indexes = array_keys($this->indexManager->getAllIndexes());
162
163
            foreach ($indexes as $index) {
164
                $this->populateIndex($output, $index, $reset, $options);
165
            }
166
        }
167
    }
168
169
    /**
170
     * Recreates an index, populates its types, and refreshes the index.
171
     *
172
     * @param OutputInterface $output
173
     * @param string          $index
174
     * @param bool            $reset
175
     * @param array           $options
176
     */
177
    private function populateIndex(OutputInterface $output, $index, $reset, $options)
178
    {
179
        $event = new IndexPopulateEvent($index, $reset, $options);
180
        $this->dispatcher->dispatch(IndexPopulateEvent::PRE_INDEX_POPULATE, $event);
181
182
        if ($event->isReset()) {
183
            $output->writeln(sprintf('<info>Resetting</info> <comment>%s</comment>', $index));
184
            $this->resetter->resetIndex($index, true);
185
        }
186
187
        $types = array_keys($this->pagerProviderRegistry->getIndexProviders($index));
188
        foreach ($types as $type) {
189
            $this->populateIndexType($output, $index, $type, false, $event->getOptions());
190
        }
191
192
        $this->dispatcher->dispatch(IndexPopulateEvent::POST_INDEX_POPULATE, $event);
193
194
        $this->refreshIndex($output, $index);
195
    }
196
197
    /**
198
     * Deletes/remaps an index type, populates it, and refreshes the index.
199
     *
200
     * @param OutputInterface $output
201
     * @param string          $index
202
     * @param string          $type
203
     * @param bool            $reset
204
     * @param array           $options
205
     */
206
    private function populateIndexType(OutputInterface $output, $index, $type, $reset, $options)
207
    {
208
        $event = new TypePopulateEvent($index, $type, $reset, $options);
209
        $this->dispatcher->dispatch(TypePopulateEvent::PRE_TYPE_POPULATE, $event);
210
211
        if ($event->isReset()) {
212
            $output->writeln(sprintf('<info>Resetting</info> <comment>%s/%s</comment>', $index, $type));
213
            $this->resetter->resetIndexType($index, $type);
214
        }
215
216
        $offset = 1 < $options['first_page'] ? ($options['first_page'] - 1) * $options['max_per_page'] : 0;
217
        $loggerClosure = ProgressClosureBuilder::build($output, 'Populating', $index, $type, $offset);
218
219
        $this->dispatcher->addListener(
220
            Events::ON_EXCEPTION,
221
            function(OnExceptionEvent $event) use ($loggerClosure) {
222
                $loggerClosure(
223
                    count($event->getObjects()),
224
                    $event->getPager()->getNbResults(),
225
                    sprintf('<error>%s</error>', $event->getException()->getMessage())
226
                );
227
            }
228
        );
229
230
        $this->dispatcher->addListener(
231
            Events::POST_INSERT_OBJECTS,
232
            function(PostInsertObjectsEvent $event) use ($loggerClosure) {
233
                $loggerClosure(count($event->getObjects()), $event->getPager()->getNbResults());
234
            }
235
        );
236
237
        $this->dispatcher->addListener(
238
            Events::POST_ASYNC_INSERT_OBJECTS,
239
            function(PostAsyncInsertObjectsEvent $event) use ($loggerClosure) {
240
                $loggerClosure($event->getObjectsCount(), $event->getPager()->getNbResults(), $event->getErrorMessage());
241
            }
242
        );
243
244
        if ($options['ignore_errors']) {
245
            $this->dispatcher->addListener(Events::ON_EXCEPTION, function(OnExceptionEvent $event) {
246
                if ($event->getException() instanceof BulkResponseException) {
247
                    $event->setIgnore(true);
248
                }
249
            });
250
        }
251
252
        $provider = $this->pagerProviderRegistry->getProvider($index, $type);
253
254
        $pager = $provider->provide($options);
255
256
        $options['indexName'] = $index;
257
        $options['typeName'] = $type;
258
259
        $this->pagerPersister->insert($pager, $options);
260
261
        $this->dispatcher->dispatch(TypePopulateEvent::POST_TYPE_POPULATE, $event);
262
263
        $this->refreshIndex($output, $index);
264
    }
265
266
    /**
267
     * Refreshes an index.
268
     *
269
     * @param OutputInterface $output
270
     * @param string          $index
271
     */
272
    private function refreshIndex(OutputInterface $output, $index)
273
    {
274
        $output->writeln(sprintf('<info>Refreshing</info> <comment>%s</comment>', $index));
275
        $this->indexManager->getIndex($index)->refresh();
276
    }
277
}
278