Completed
Push — master ( 7b9575...49c7e6 )
by Maksim
17s
created

PopulateCommand::__construct()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 15
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
c 0
b 0
f 0
dl 0
loc 15
ccs 0
cts 8
cp 0
rs 9.4285
cc 1
eloc 12
nc 1
nop 5
crap 2
1
<?php
2
3
/*
4
 * This file is part of the FOSElasticaBundle package.
5
 *
6
 * (c) FriendsOfSymfony <http://friendsofsymfony.github.com/>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 */
11
12
namespace FOS\ElasticaBundle\Command;
13
14
use Elastica\Exception\Bulk\ResponseException as BulkResponseException;
15
use FOS\ElasticaBundle\Event\IndexPopulateEvent;
16
use FOS\ElasticaBundle\Event\TypePopulateEvent;
17
use FOS\ElasticaBundle\Index\IndexManager;
18
use FOS\ElasticaBundle\Index\Resetter;
19
use FOS\ElasticaBundle\Persister\Event\Events;
20
use FOS\ElasticaBundle\Persister\Event\OnExceptionEvent;
21
use FOS\ElasticaBundle\Persister\Event\PostAsyncInsertObjectsEvent;
22
use FOS\ElasticaBundle\Persister\Event\PostInsertObjectsEvent;
23
use FOS\ElasticaBundle\Persister\InPlacePagerPersister;
24
use FOS\ElasticaBundle\Persister\PagerPersisterInterface;
25
use FOS\ElasticaBundle\Persister\PagerPersisterRegistry;
26
use FOS\ElasticaBundle\Provider\PagerProviderRegistry;
27
use Symfony\Component\Console\Command\Command;
28
use Symfony\Component\Console\Helper\ProgressBar;
29
use Symfony\Component\Console\Helper\QuestionHelper;
30
use Symfony\Component\Console\Input\InputInterface;
31
use Symfony\Component\Console\Input\InputOption;
32
use Symfony\Component\Console\Output\OutputInterface;
33
use Symfony\Component\Console\Question\Question;
34
use Symfony\Component\EventDispatcher\EventDispatcherInterface;
35
36
/**
37
 * Populate the search index.
38
 */
39
class PopulateCommand extends Command
40
{
41
    protected static $defaultName = 'fos:elastica:populate';
42
43
    /**
44
     * @var EventDispatcherInterface 
45
     */
46
    private $dispatcher;
47
48
    /**
49
     * @var IndexManager 
50
     */
51
    private $indexManager;
52
53
    /**
54
     * @var PagerProviderRegistry
55
     */
56
    private $pagerProviderRegistry;
57
58
    /**
59
     * @var PagerPersisterRegistry
60
     */
61
    private $pagerPersisterRegistry;
62
63
    /**
64
     * @var PagerPersisterInterface
65
     */
66
    private $pagerPersister;
67
68
    /**
69
     * @var Resetter
70
     */
71
    private $resetter;
72
73
    public function __construct(
74
        EventDispatcherInterface $dispatcher,
75
        IndexManager $indexManager,
76
        PagerProviderRegistry $pagerProviderRegistry,
77
        PagerPersisterRegistry $pagerPersisterRegistry,
78
        Resetter $resetter
79
    ) {
80
        parent::__construct();
81
82
        $this->dispatcher = $dispatcher;
83
        $this->indexManager = $indexManager;
84
        $this->pagerProviderRegistry = $pagerProviderRegistry;
85
        $this->pagerPersisterRegistry = $pagerPersisterRegistry;
86
        $this->resetter = $resetter;
87
    }
88
89
    protected function configure()
90
    {
91
        $this
92
            ->setName('fos:elastica:populate')
93
            ->addOption('index', null, InputOption::VALUE_OPTIONAL, 'The index to repopulate')
94
            ->addOption('type', null, InputOption::VALUE_OPTIONAL, 'The type to repopulate')
95
            ->addOption('no-reset', null, InputOption::VALUE_NONE, 'Do not reset index before populating')
96
            ->addOption('no-delete', null, InputOption::VALUE_NONE, 'Do not delete index after populate')
97
            ->addOption('offset', null, InputOption::VALUE_REQUIRED, '[DEPRECATED] Start indexing at offset', 0)
98
            ->addOption('sleep', null, InputOption::VALUE_REQUIRED, 'Sleep time between persisting iterations (microseconds)', 0)
99
            ->addOption('batch-size', null, InputOption::VALUE_REQUIRED, '[DEPRECATED] Index packet size (overrides provider config option)')
100
            ->addOption('ignore-errors', null, InputOption::VALUE_NONE, 'Do not stop on errors')
101
            ->addOption('no-overwrite-format', null, InputOption::VALUE_NONE, 'Prevent this command from overwriting ProgressBar\'s formats')
102
103
            ->addOption('first-page', null, InputOption::VALUE_REQUIRED, 'The pager\'s page to start population from. Including the given page.', 1)
104
            ->addOption('last-page', null, InputOption::VALUE_REQUIRED, 'The pager\'s page to end population on. Including the given page.', null)
105
            ->addOption('max-per-page', null, InputOption::VALUE_REQUIRED, 'The pager\'s page size', 100)
106
            ->addOption('pager-persister', null, InputOption::VALUE_REQUIRED, 'The pager persister to be used to populate the index', InPlacePagerPersister::NAME)
107
108
            ->setDescription('Populates search indexes from providers')
109
        ;
110
    }
111
112
    protected function initialize(InputInterface $input, OutputInterface $output)
113
    {
114
        $this->pagerPersister = $this->pagerPersisterRegistry->getPagerPersister($input->getOption('pager-persister'));
115
116
        if (!$input->getOption('no-overwrite-format')) {
117
            ProgressBar::setFormatDefinition('normal', " %current%/%max% [%bar%] %percent:3s%%\n%message%");
118
            ProgressBar::setFormatDefinition('verbose', " %current%/%max% [%bar%] %percent:3s%% %elapsed:6s%\n%message%");
119
            ProgressBar::setFormatDefinition('very_verbose', " %current%/%max% [%bar%] %percent:3s%% %elapsed:6s%/%estimated:-6s%\n%message%");
120
            ProgressBar::setFormatDefinition('debug', " %current%/%max% [%bar%] %percent:3s%% %elapsed:6s%/%estimated:-6s% %memory:6s%\n%message%");
121
        }
122
    }
123
124
    protected function execute(InputInterface $input, OutputInterface $output)
125
    {
126
        $index = $input->getOption('index');
127
        $type = $input->getOption('type');
128
        $reset = !$input->getOption('no-reset');
129
        $delete = !$input->getOption('no-delete');
130
131
        $options = [
132
            'delete' => $delete,
133
            'reset' => $reset,
134
            'ignore_errors' => $input->getOption('ignore-errors'),
135
            'offset' => $input->getOption('offset'),
136
            'sleep' => $input->getOption('sleep'),
137
            'first_page' => $input->getOption('first-page'),
138
            'max_per_page' => $input->getOption('max-per-page'),
139
        ];
140
141
        if ($input->getOption('last-page')) {
142
            $options['last_page'] = $input->getOption('last-page');
143
        }
144
145
        if ($input->getOption('batch-size')) {
146
            $options['batch_size'] = (int) $input->getOption('batch-size');
147
        }
148
149
        if ($input->isInteractive() && $reset && $input->getOption('offset')) {
150
            /** @var QuestionHelper $dialog */
151
            $dialog = $this->getHelperSet()->get('question');
152
            if (!$dialog->ask($input, $output, new Question('<question>You chose to reset the index and start indexing with an offset. Do you really want to do that?</question>'))) {
153
                return;
154
            }
155
        }
156
157
        if (null === $index && null !== $type) {
158
            throw new \InvalidArgumentException('Cannot specify type option without an index.');
159
        }
160
161
        if (null !== $index) {
162
            if (null !== $type) {
163
                $this->populateIndexType($output, $index, $type, $reset, $options);
164
            } else {
165
                $this->populateIndex($output, $index, $reset, $options);
166
            }
167
        } else {
168
            $indexes = array_keys($this->indexManager->getAllIndexes());
169
170
            foreach ($indexes as $index) {
171
                $this->populateIndex($output, $index, $reset, $options);
172
            }
173
        }
174
    }
175
176
    /**
177
     * Recreates an index, populates its types, and refreshes the index.
178
     *
179
     * @param OutputInterface $output
180
     * @param string          $index
181
     * @param bool            $reset
182
     * @param array           $options
183
     */
184
    private function populateIndex(OutputInterface $output, $index, $reset, $options)
185
    {
186
        $event = new IndexPopulateEvent($index, $reset, $options);
187
        $this->dispatcher->dispatch(IndexPopulateEvent::PRE_INDEX_POPULATE, $event);
188
189
        if ($event->isReset()) {
190
            $output->writeln(sprintf('<info>Resetting</info> <comment>%s</comment>', $index));
191
            $this->resetter->resetIndex($index, true);
192
        }
193
194
        $types = array_keys($this->pagerProviderRegistry->getIndexProviders($index));
195
        foreach ($types as $type) {
196
            $this->populateIndexType($output, $index, $type, false, $event->getOptions());
197
        }
198
199
        $this->dispatcher->dispatch(IndexPopulateEvent::POST_INDEX_POPULATE, $event);
200
201
        $this->refreshIndex($output, $index);
202
    }
203
204
    /**
205
     * Deletes/remaps an index type, populates it, and refreshes the index.
206
     *
207
     * @param OutputInterface $output
208
     * @param string          $index
209
     * @param string          $type
210
     * @param bool            $reset
211
     * @param array           $options
212
     */
213
    private function populateIndexType(OutputInterface $output, $index, $type, $reset, $options)
214
    {
215
        $event = new TypePopulateEvent($index, $type, $reset, $options);
216
        $this->dispatcher->dispatch(TypePopulateEvent::PRE_TYPE_POPULATE, $event);
217
218
        if ($event->isReset()) {
219
            $output->writeln(sprintf('<info>Resetting</info> <comment>%s/%s</comment>', $index, $type));
220
            $this->resetter->resetIndexType($index, $type);
221
        }
222
223
        $offset = $options['offset'];
224
        $loggerClosure = ProgressClosureBuilder::build($output, 'Populating', $index, $type, $offset);
225
226
        if ($loggerClosure) {
227
            $this->dispatcher->addListener(
228
                Events::ON_EXCEPTION,
229
                function(OnExceptionEvent $event) use ($loggerClosure, $options) {
230
                    $loggerClosure(
231
                        $options['batch_size'],
232
                        count($event->getObjects()),
233
                        sprintf('<error>%s</error>', $event->getException()->getMessage())
234
                    );
235
                }
236
            );
237
238
            $this->dispatcher->addListener(
239
                Events::POST_INSERT_OBJECTS,
240
                function(PostInsertObjectsEvent $event) use ($loggerClosure) {
241
                    $loggerClosure(count($event->getObjects()), $event->getPager()->getNbResults());
242
                }
243
            );
244
245
            $this->dispatcher->addListener(
246
                Events::POST_ASYNC_INSERT_OBJECTS,
247
                function(PostAsyncInsertObjectsEvent $event) use ($loggerClosure) {
248
                    $loggerClosure($event->getObjectsCount(), $event->getPager()->getNbResults(), $event->getErrorMessage());
249
                }
250
            );
251
        }
252
253
        if ($options['ignore_errors']) {
254
            $this->dispatcher->addListener(Events::ON_EXCEPTION, function(OnExceptionEvent $event) {
255
                if ($event->getException() instanceof BulkResponseException) {
256
                    $event->setIgnore(true);
257
                }
258
            });
259
        }
260
261
        $provider = $this->pagerProviderRegistry->getProvider($index, $type);
262
263
        $pager = $provider->provide($options);
264
265
        $options['indexName'] = $index;
266
        $options['typeName'] = $type;
267
268
        $this->pagerPersisterRegistry->insert($pager, $options);
0 ignored issues
show
Bug introduced by
The method insert() does not seem to exist on object<FOS\ElasticaBundl...PagerPersisterRegistry>.

This check looks for calls to methods that do not seem to exist on a given type. It looks for the method on the type itself as well as in inherited classes or implemented interfaces.

This is most likely a typographical error or the method has been renamed.

Loading history...
269
270
        $this->dispatcher->dispatch(TypePopulateEvent::POST_TYPE_POPULATE, $event);
271
272
        $this->refreshIndex($output, $index);
273
    }
274
275
    /**
276
     * Refreshes an index.
277
     *
278
     * @param OutputInterface $output
279
     * @param string          $index
280
     */
281
    private function refreshIndex(OutputInterface $output, $index)
282
    {
283
        $output->writeln(sprintf('<info>Refreshing</info> <comment>%s</comment>', $index));
284
        $this->indexManager->getIndex($index)->refresh();
285
    }
286
}
287