Completed
Push — non_purge_indexer ( e506c0...6716dd )
by André
16:19
created

ReindexCommand::indexIncrementally()   C

Complexity

Conditions 10
Paths 52

Size

Total Lines 61
Code Lines 40

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 10
eloc 40
nc 52
nop 4
dl 0
loc 61
rs 6.4757
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
/**
4
 * This file is part of the eZ Publish Kernel package.
5
 *
6
 * @copyright Copyright (C) eZ Systems AS. All rights reserved.
7
 * @license For full copyright and license information view LICENSE file distributed with this source code.
8
 */
9
namespace eZ\Bundle\EzPublishCoreBundle\Command;
10
11
use eZ\Publish\SPI\Persistence\Content\ContentInfo;
12
use eZ\Publish\Core\Search\Common\Indexer;
13
use eZ\Publish\Core\Search\Common\IncrementalIndexer;
14
use Doctrine\DBAL\Driver\Statement;
15
use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand;
16
use Symfony\Component\Console\Helper\ProgressBar;
17
use Symfony\Component\Console\Input\InputInterface;
18
use Symfony\Component\Console\Input\InputOption;
19
use Symfony\Component\Console\Output\OutputInterface;
20
use Symfony\Component\Process\PhpExecutableFinder;
21
use Symfony\Component\Process\ProcessBuilder;
22
use RuntimeException;
23
use DateTime;
24
use PDO;
25
26
class ReindexCommand extends ContainerAwareCommand
27
{
28
    /**
29
     * @var \eZ\Publish\Core\Search\Common\Indexer|\eZ\Publish\Core\Search\Common\IncrementalIndexer
30
     */
31
    private $searchIndexer;
32
33
    /**
34
     * @var \Doctrine\DBAL\Connection
35
     */
36
    private $connection;
37
38
    /**
39
     * Initialize objects required by {@see execute()}.
40
     *
41
     * @param InputInterface $input
42
     * @param OutputInterface $output
43
     */
44
    public function initialize(InputInterface $input, OutputInterface $output)
45
    {
46
        parent::initialize($input, $output);
47
        $this->searchIndexer = $this->getContainer()->get('ezpublish.spi.search.indexer');
48
        $this->connection = $this->getContainer()->get('ezpublish.api.storage_engine.legacy.connection');
49
        if (!$this->searchIndexer instanceof Indexer) {
50
            throw new RuntimeException(
51
                sprintf(
52
                    'Expected to find Search Engine Indexer but found "%s" instead',
53
                    get_parent_class($this->searchIndexer)
54
                )
55
            );
56
        }
57
    }
58
59
    /**
60
     * {@inheritdoc}
61
     */
62
    protected function configure()
63
    {
64
        $this
65
            ->setName('ezplatform:reindex')
66
            ->setDescription('Recreate or Refresh search engine index')
67
            ->addOption(
68
                'iteration-count',
69
                'c',
70
                InputOption::VALUE_OPTIONAL,
71
                'Number of objects to be indexed in a single iteration, for avoiding using too much memory',
72
                50
73
            )->addOption(
74
                'no-commit',
75
                null,
76
                InputOption::VALUE_NONE,
77
                'Do not commit after each iteration'
78
            )->addOption(
79
                'no-purge',
80
                null,
81
                InputOption::VALUE_NONE,
82
                'Do not purge before indexing'
83
            )->addOption(
84
                'since',
85
                null,
86
                InputOption::VALUE_OPTIONAL,
87
                'Refresh changes since a given time, any format understood by DateTime. Implies "no-purge", can not be combined with "content-ids" or "subtree"'
88
            )->addOption(
89
                'content-ids',
90
                null,
91
                InputOption::VALUE_OPTIONAL,
92
                'Comma separated list of content id\'s to refresh (deleted/updated/added). Implies "no-purge", can not be combined with "since" or "subtree"'
93
            )->addOption(
94
                'subtree',
95
                null,
96
                InputOption::VALUE_OPTIONAL,
97
                'Location Id to index subtree of (incl self). Implies "no-purge", can not be combined with "since" or "content-ids"'
98
            )->addOption(
99
                'processes',
100
                null,
101
                InputOption::VALUE_OPTIONAL,
102
                'Number of sub processes to spawn in parallel handling iterations, default number is number of CPU cores -1, set to 1 or 0 to disable',
103
                $this->getNumberOfCPUCores()
104
            )->setHelp(
105
                <<<EOT
106
The command <info>%command.name%</info> indexes current configured database in configured search engine index.
107
108
109
Example usage:
110
- Refresh (add/update) index changes since yesterday:
111
  <comment>ezplatform:reindex --since=yesterday</comment>
112
  See: http://php.net/manual/en/datetime.formats.php
113
114
- Refresh (add/update/remove) index on a set of content id's:
115
  <comment>ezplatform:reindex --content-ids=2,34,68</comment>
116
117
- Refresh (add/update) index of a subtree:
118
  <comment>ezplatform:reindex --subtree=45</comment>
119
120
 - Refresh (add/update) the whole index using 3 processes, & let search engine handle commits itself using auto commit:
121
   <comment>ezplatform:reindex --no-purge --no-commit --processes=3</comment>
122
123
EOT
124
            );
125
    }
126
127
    /**
128
     * {@inheritdoc}
129
     */
130
    protected function execute(InputInterface $input, OutputInterface $output)
131
    {
132
        $commit = !$input->getOption('no-commit');
133
        $iterationCount = $input->getOption('iteration-count');
134
        if (!is_numeric($iterationCount) || (int) $iterationCount < 1) {
135
            throw new RuntimeException("'--iteration-count' option should be > 0, got '{$iterationCount}'");
136
        }
137
138
        if (!$this->searchIndexer instanceof IncrementalIndexer) {
139
            $output->writeln(<<<EOT
140
DEPRECATED:
141
Running indexing against an Indexer that has not been updated to use IncrementalIndexer abstract.
142
143
Options that won't be taken into account:
144
- since
145
- content-ids
146
- processes
147
- no-purge
148
EOT
149
            );
150
            $this->searchIndexer->createSearchIndex($output, (int) $iterationCount, !$commit);
151
        } else {
152
            $output->writeln('Re-indexing started for search engine: ' . $this->searchIndexer->getName());
153
            $output->writeln('');
154
155
            $return = $this->indexIncrementally($input, $output, $iterationCount, $commit);
156
157
            $output->writeln('');
158
            $output->writeln('Finished re-indexing');
159
160
            return $return;
161
        }
162
    }
163
164
    protected function indexIncrementally(InputInterface $input, OutputInterface $output, $iterationCount, $commit)
165
    {
166
        if ($contentIds = $input->getOption('content-ids')) {
167
            $output->writeln('Indexing list of content id\'s');
168
169
            return $this->searchIndexer->updateSearchIndex(explode(',', $contentIds), $commit);
170
        }
171
172
        if ($since = $input->getOption('since')) {
173
            $stmt = $this->getStatementContentSince(new DateTime($since));
174
            $count = (int)$this->getStatementContentSince(new DateTime($since), true)->fetchColumn();
175
            $purge = false;
176
        } else if ($locationId = (int) $input->getOption('subtree')) {
177
            $stmt = $this->getStatementSubtree($locationId);
178
            $count = (int) $this->getStatementSubtree($locationId, true)->fetchColumn();
179
            $purge = false;
180
        } else {
181
            $stmt = $this->getStatementContentAll();
182
            $count = (int) $this->getStatementContentAll(true)->fetchColumn();
183
            $purge = !$input->getOption('no-purge');
184
        }
185
186
        if (!$count) {
187
            $output->writeln('<error>Could not find any items to index, aborting.</error>');
188
189
            return 1;
190
        }
191
192
        $iterations = ceil($count / $iterationCount);
193
        $processCount = (int) $input->getOption('processes');
194
        $processCount = $processCount > $iterations ? $iterations : $processCount;
195
        $processMessage = $processCount > 1 ? "using $processCount parallel processes" : "using single process";
196
197
        if ($purge) {
198
            $output->writeln('Purging index...');
199
            $this->searchIndexer->purge();
200
201
            $output->writeln(
202
                "<info>Re-Creating index for {$count} items across $iterations iteration(s), $processMessage:"
203
            );
204
        } else {
205
            $output->writeln(
206
                "<info>Refreshing index for {$count} items across $iterations iteration(s), $processMessage:"
207
            );
208
        }
209
210
        $progress = new ProgressBar($output);
211
        $progress->start($iterations);
212
213
        if ($processCount > 1) {
214
            $this->runParallelProcess($progress, $stmt, (int) $processCount, (int) $iterationCount, $commit);
215
        } else {
216
            // if we only have one process, or less iterations to warrant running several, we index it all inline
217
            foreach ($this->fetchIteration($stmt, $iterationCount) as $contentIds) {
218
                $this->searchIndexer->updateSearchIndex($contentIds, $commit);
219
                $progress->advance(1);
220
            }
221
        }
222
223
        $progress->finish();
224
    }
225
226
    private function runParallelProcess(ProgressBar $progress, Statement $stmt, $processCount, $iterationCount, $commit)
227
    {
228
        /**
229
         * @var \Symfony\Component\Process\Process[]|null[]
230
         */
231
        $processes = array_fill(0, $processCount, null);
232
        $generator = $this->fetchIteration($stmt, $iterationCount);
233
        do {
234
            foreach ($processes as $key => $process) {
235
                if ($process !== null && $process->isRunning()) {
236
                    continue;
237
                }
238
239
                if ($process !== null) {
240
                    // One of the processes just finished, so we increment progress bar
241
                    $progress->advance(1);
242
                }
243
244
                if (!$generator->valid()) {
245
                    unset($processes[$key]);
246
                    continue;
247
                }
248
249
                $processes[$key] = $this->getPhpProcess($generator->current(), $commit);
250
                $processes[$key]->start();
251
                $generator->next();
252
            }
253
254
            if (!empty($processes)) {
255
                sleep(1);
256
            }
257
        } while (!empty($processes));
258
    }
259
260
    /**
261
     * @param DateTime $since
262
     * @param bool $count
263
     *
264
     * @return \Doctrine\DBAL\Driver\Statement
265
     */
266
    private function getStatementContentSince(DateTime $since, $count = false)
267
    {
268
        $q = $this->connection->createQueryBuilder()
269
            ->select($count ? 'count(c.id)' : 'c.id')
270
            ->from('ezcontentobject', 'c')
271
            ->where('c.status = :status')->andWhere('c.modified >= :since')
272
            ->orderBy('c.modified')
273
            ->setParameter('status', ContentInfo::STATUS_PUBLISHED, PDO::PARAM_INT)
274
            ->setParameter('since', $since->getTimestamp(), PDO::PARAM_INT);
275
276
        return $q->execute();
277
    }
278
279
    /**
280
     * @param mixed $locationId
281
     * @param bool $count
282
     *
283
     * @return \Doctrine\DBAL\Driver\Statement
284
     */
285
    private function getStatementSubtree($locationId, $count = false)
286
    {
287
        /**
288
         * @var \eZ\Publish\SPI\Persistence\Content\Location\Handler $locationHandler
289
         */
290
        $locationHandler = $this->getContainer()->get('ezpublish.spi.persistence.location_handler');
291
        $location = $locationHandler->load($locationId);
292
        $q = $this->connection->createQueryBuilder()
293
            ->select($count ? 'count(DISTINCT c.id)' : 'DISTINCT c.id')
294
            ->from('ezcontentobject', 'c')
295
            ->innerJoin('c', 'ezcontentobject_tree', 't', 't.contentobject_id = c.id')
296
            ->where('c.status = :status')
297
            ->andWhere('t.path_string LIKE :path')
298
            ->orderBy('t.path_string')
299
            ->setParameter('status', ContentInfo::STATUS_PUBLISHED, PDO::PARAM_INT)
300
            ->setParameter('path', $location->pathString . '%', PDO::PARAM_STR);
301
302
        return $q->execute();
0 ignored issues
show
Bug Compatibility introduced by
The expression $q->execute(); of type Doctrine\DBAL\Driver\Statement|integer adds the type integer to the return on line 302 which is incompatible with the return type documented by eZ\Bundle\EzPublishCoreB...nd::getStatementSubtree of type Doctrine\DBAL\Driver\Statement.
Loading history...
303
    }
304
305
    /**
306
     * @param bool $count
307
     *
308
     * @return \Doctrine\DBAL\Driver\Statement
309
     */
310
    private function getStatementContentAll($count = false)
311
    {
312
        $q = $this->connection->createQueryBuilder()
313
            ->select($count ? 'count(c.id)' : 'c.id')
314
            ->from('ezcontentobject', 'c')
315
            ->where('c.status = :status')
316
            ->setParameter('status', ContentInfo::STATUS_PUBLISHED, PDO::PARAM_INT);
317
318
        return $q->execute();
319
    }
320
321
    /**
322
     * @param \Doctrine\DBAL\Driver\Statement $stmt
323
     * @param int $iterationCount
324
     *
325
     * @return \Generator Return an array of arrays, each array contains content id's of $iterationCount.
326
     */
327
    private function fetchIteration(Statement $stmt, $iterationCount)
328
    {
329
        do {
330
            $contentIds = [];
331 View Code Duplication
            for ($i = 0; $i < $iterationCount; ++$i) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
332
                if ($contentId = $stmt->fetch(PDO::FETCH_COLUMN)) {
333
                    $contentIds[] = $contentId;
334
                } else {
335
                    break;
336
                }
337
            }
338
339
            yield $contentIds;
340
        } while (!empty($contentId));
341
    }
342
343
    /**
344
     * @param array $contentIds
345
     * @param bool $commit
346
     *
347
     * @return \Symfony\Component\Process\Process
348
     */
349
    private static function getPhpProcess(array $contentIds, $commit)
350
    {
351
        $phpFinder = new PhpExecutableFinder();
352
        if (!$phpPath = $phpFinder->find()) {
353
            throw new \RuntimeException(
354
                'The php executable could not be found, add it to your PATH environment variable and try again'
355
            );
356
        }
357
358
        $process = new ProcessBuilder([
359
            file_exists('bin/console') ? 'bin/console' : 'app/console',
360
            'ezplatform:reindex',
361
            '--content-ids=' . implode(',', $contentIds),
362
        ]);
363
        $process->setTimeout(null);
364
        $process->setPrefix($phpPath);
365
366
        if (!$commit) {
367
            $process->add('--no-commit');
368
        }
369
370
        return $process->getProcess();
371
    }
372
373
    /**
374
     * @return int
375
     */
376
    private function getNumberOfCPUCores()
377
    {
378
        $cores = 1;
379
        if (is_file('/proc/cpuinfo')) {
380
            // Linux (and potentially Windows with linux sub systems)
381
            $cpuinfo = file_get_contents('/proc/cpuinfo');
382
            preg_match_all('/^processor/m', $cpuinfo, $matches);
383
            $cores = count($matches[0]);
384
        } elseif (DIRECTORY_SEPARATOR === '\\') {
385
            // Windows
386
            if (($process = @popen('wmic cpu get NumberOfCores', 'rb')) !== false) {
387
                fgets($process);
388
                $cores = (int) fgets($process);
389
                pclose($process);
390
            }
391
        } elseif (($process = @popen('sysctl -a', 'rb')) !== false) {
392
            // *nix (Linux, BSD and Mac)
393
            $output = stream_get_contents($process);
394
            if (preg_match('/hw.ncpu: (\d+)/', $output, $matches)) {
395
                $cores = (int) $matches[1][0];
396
            }
397
            pclose($process);
398
        }
399
400
        return $cores;
401
    }
402
}
403