Completed
Push — non_purge_indexer ( c5ca78...13d609 )
by André
12:02
created

ReindexCommand::configure()   B

Complexity

Conditions 1
Paths 1

Size

Total Lines 30
Code Lines 14

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 14
nc 1
nop 0
dl 0
loc 30
rs 8.8571
c 0
b 0
f 0
1
<?php
2
3
/**
4
 * This file is part of the eZ Publish Kernel package.
5
 *
6
 * @copyright Copyright (C) eZ Systems AS. All rights reserved.
7
 * @license For full copyright and license information view LICENSE file distributed with this source code.
8
 */
9
namespace eZ\Bundle\EzPublishCoreBundle\Command;
10
11
use eZ\Publish\SPI\Persistence\Content\ContentInfo;
12
use eZ\Publish\Core\Search\Common\Indexer;
13
use eZ\Publish\Core\Search\Common\IncrementalIndexer;
14
use Doctrine\DBAL\Driver\Statement;
15
use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand;
16
use Symfony\Component\Console\Helper\ProgressBar;
17
use Symfony\Component\Console\Input\InputInterface;
18
use Symfony\Component\Console\Input\InputOption;
19
use Symfony\Component\Console\Output\OutputInterface;
20
use Symfony\Component\Process\Process;
21
use Symfony\Component\Process\PhpExecutableFinder;
22
use RuntimeException;
23
use DateTime;
24
use PDO;
25
26
class ReindexCommand extends ContainerAwareCommand
27
{
28
    /**
29
     * @var \eZ\Publish\Core\Search\Common\Indexer|\eZ\Publish\Core\Search\Common\IncrementalIndexer
30
     */
31
    private $searchIndexer;
32
33
    /**
34
     * @var \Doctrine\DBAL\Connection
35
     */
36
    private $connection;
37
38
    /**
39
     * Initialize objects required by {@see execute()}.
40
     *
41
     * @param InputInterface $input
42
     * @param OutputInterface $output
43
     */
44
    public function initialize(InputInterface $input, OutputInterface $output)
45
    {
46
        parent::initialize($input, $output);
47
        $this->searchIndexer = $this->getContainer()->get('ezpublish.spi.search.indexer');
48
        $this->connection = $this->getContainer()->get('ezpublish.api.storage_engine.legacy.connection');
49
        if (!$this->searchIndexer instanceof Indexer) {
50
            throw new RuntimeException(
51
                sprintf('Expected to find Search Engine Indexer but found "%s" instead', get_parent_class($this->searchIndexer))
52
            );
53
        }
54
    }
55
56
    /**
57
     * {@inheritdoc}
58
     */
59
    protected function configure()
60
    {
61
        $this
62
            ->setName('ezplatform:reindex')
63
            ->setDescription('Recreate or Refresh search engine index')
64
            ->addOption('iteration-count', 'c', InputOption::VALUE_OPTIONAL, 'Number of objects to be indexed in a single iteration, for avoiding using to much memory', 50)
65
            ->addOption('no-commit', null, InputOption::VALUE_NONE, 'Do not commit after each iteration')
66
            ->addOption('no-purge', null, InputOption::VALUE_NONE, 'Do not purge before indexing')
67
            ->addOption('since', null, InputOption::VALUE_OPTIONAL, 'Index changes since a given time, any format understood by DateTime. Implies "no-purge", can not be combined with "content-ids"')
68
            ->addOption('content-ids', null, InputOption::VALUE_OPTIONAL, 'Comma separated list of content id\'s to refresh (deleted or updated/added). Implies "no-purge", can not be combined with "since"')
69
            ->addOption('processes', null, InputOption::VALUE_OPTIONAL, "Number of sub processes to spawn in parallel, default number is number of CPU cores -1, set to 1 or 0 to disable", $this->getNumberOfCPUCores())
70
            ->setHelp(
71
                <<<EOT
72
The command <info>%command.name%</info> indexes current configured database in configured search engine index.
73
74
75
Example usage:
76
- Refresh (add/update) index changes since yesterday:
77
  <comment>ezplatform:reindex --since=yesterday</comment>
78
  See: http://php.net/manual/en/datetime.formats.php
79
80
- Refresh (add/update/remove) index on a set of content id's:
81
  <comment>ezplatform:reindex --content-ids=2,34,68</comment>
82
83
 - Refresh (add/update) the whole index using 3 processes, & let search engine handle commits itself using auto commit:
84
   <comment>ezplatform:reindex --no-purge --no-commit --processes=3</comment>
85
86
EOT
87
            );
88
    }
89
90
    /**
91
     * {@inheritdoc}
92
     */
93
    protected function execute(InputInterface $input, OutputInterface $output)
94
    {
95
        $commit = !$input->getOption('no-commit');
96
        $iterationCount = $input->getOption('iteration-count');
97
        if (!is_numeric($iterationCount) || (int) $iterationCount < 1) {
98
            throw new RuntimeException("'--iteration-count' option should be > 0, got '{$iterationCount}'");
99
        }
100
101
        if (!$this->searchIndexer instanceof IncrementalIndexer) {
102
            $output->writeln(<<<EOT
103
DEPRECATED:
104
Running indexing against an Indexer that has not been updated to use IncrementalIndexer abstract.
105
106
Options that won't be taken into account:
107
- since
108
- content-ids
109
- processes
110
- no-purge
111
EOT
112
            );
113
            $this->searchIndexer->createSearchIndex($output, (int) $iterationCount, !$commit);
114
115
            return 0;
116
        }
117
118
        $output->writeln('Re-indexing started for search engine: ' . $this->searchIndexer->getName());
119
        $output->writeln('');
120
        $this->indexIteratively($input, $output, $iterationCount, $commit);
121
        $output->writeln('');
122
        $output->writeln('Finished re-indexing');
123
    }
124
125
    protected function indexIteratively(InputInterface $input, OutputInterface $output, $iterationCount, $commit)
126
    {
127
        if ($contentIds = $input->getOption('content-ids')) {
128
            $output->writeln('Indexing list of content id\'s');
129
130
            return $this->searchIndexer->updateSearchIndex(explode(',', $contentIds), $commit);
131
        }
132
133
        if ($since = $input->getOption('since')) {
134
            $stmt = $this->getStatementContentSince(new DateTime($since));
135
            $count = (int) $this->getStatementContentSince(new DateTime($since), true)->fetchColumn();
136
            $purge = false;
137
        } else {
138
            $stmt = $this->getStatementContentAll();
139
            $count = (int) $this->getStatementContentAll(true)->fetchColumn();
140
            $purge = !$input->getOption('no-purge');
141
        }
142
143
        if (!$count) {
144
            return $output->writeln('<error>Could not find any items to index, aborting.</error>');
145
        }
146
147
        $iterations = ceil($count / $iterationCount);
148
        if ($purge) {
149
            $output->writeln('Purging index...');
150
            $this->searchIndexer->purge();
151
            $message = "<info>Re-Creating index for {$count} content items across $iterations iteration(s)";
152
        } else {
153
            $message = "<info>Refreshing index for {$count} content items across $iterations iteration(s)";
154
        }
155
156
        $progress = new ProgressBar($output);
157
158
        if ($iterations > 1 && ($processCount = (int) $input->getOption('processes')) > 1) {
159
            // Don't run more processes then iterations
160
            $processCount = $processCount > $iterations ? $iterations : $processCount;
161
            $output->writeln($message . ", using $processCount parallel processes:</info>");
162
            $progress->start($iterations);
163
164
            return $this->runParallelProcess($progress, $stmt, (int) $processCount, (int) $iterationCount, $commit);
165
        }
166
167
        $output->writeln($message . ', using single process:</info>');
168
169
        // if we only have one process, or less iterations to warrant running several, we index it all inline
170
        $progress->start($iterations);
171
        foreach ($this->fetchIteration($stmt, $iterationCount) as $contentIds) {
172
            $this->searchIndexer->updateSearchIndex($contentIds, $commit);
173
            $progress->advance(1);
174
        }
175
176
        $progress->finish();
177
    }
178
179
    private function runParallelProcess(ProgressBar $progress, Statement $stmt, $processCount, $iterationCount, $commit)
180
    {
181
        /**
182
         * @var \Symfony\Component\Process\Process[]|null[]
183
         */
184
        $processes = array_fill(0, $processCount, null);
185
        $generator = $this->fetchIteration($stmt, $iterationCount);
186
        do {
187
            foreach ($processes as $key => $process) {
188
                if ($process !== null && $process->isRunning()) {
189
                    continue;
190
                }
191
192
                if ($process !== null) {
193
                    // One of the processes just finished, so we increment progress bar
194
                    $progress->advance(1);
195
                }
196
197
                if (!$generator->valid()) {
198
                    unset($processes[$key]);
199
                    continue;
200
                }
201
202
                $processes[$key] = $this->getPhpProcess($generator->current(), $commit);
203
                $processes[$key]->start();
204
                $generator->next();
205
            }
206
207
            if (!empty($processes)) {
208
                sleep(1);
209
            }
210
        } while (!empty($processes));
211
212
        $progress->finish();
213
    }
214
215
    /**
216
     * @param DateTime $since
217
     * @param bool $count
218
     *
219
     * @return \Doctrine\DBAL\Driver\Statement
220
     */
221
    private function getStatementContentSince(DateTime $since, $count = false)
222
    {
223
        $q = $this->connection->createQueryBuilder()
224
            ->select($count ? 'count(c.id)': 'c.id')
225
            ->from('ezcontentobject', 'c')
226
            ->where('c.status = :status')->andWhere('c.modified >= :since')
227
            ->orderBy('c.modified')
228
            ->setParameter('status', ContentInfo::STATUS_PUBLISHED, PDO::PARAM_INT)
229
            ->setParameter('since', $since->getTimestamp(), PDO::PARAM_INT);
230
231
        return $q->execute();
232
    }
233
234
    /**
235
     * @param bool $count
236
     *
237
     * @return \Doctrine\DBAL\Driver\Statement
238
     */
239
    private function getStatementContentAll($count = false)
240
    {
241
        $q = $this->connection->createQueryBuilder()
242
            ->select($count ? 'count(c.id)': 'c.id')
243
            ->from('ezcontentobject', 'c')
244
            ->where('c.status = :status')
245
            ->setParameter('status', ContentInfo::STATUS_PUBLISHED, PDO::PARAM_INT);
246
247
        return $q->execute();
248
    }
249
250
    /**
251
     * @param \Doctrine\DBAL\Driver\Statement $stmt
252
     * @param int $iterationCount
253
     *
254
     * @return \Generator Return an array of arrays, each array contains content id's of $iterationCount.
255
     */
256
    private function fetchIteration(Statement $stmt, $iterationCount)
257
    {
258
        do {
259
            $contentIds = [];
260 View Code Duplication
            for ($i = 0; $i < $iterationCount; ++$i) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
261
                if ($contentId = $stmt->fetch(PDO::FETCH_COLUMN)) {
262
                    $contentIds[] = $contentId;
263
                } else {
264
                    break;
265
                }
266
            }
267
268
            yield $contentIds;
269
        } while (!empty($contentId));
270
    }
271
272
    /**
273
     * @param array $contentIds
274
     * @param bool $commit
275
     *
276
     * @return \Symfony\Component\Process\Process
277
     */
278
    private static function getPhpProcess(array $contentIds, $commit)
279
    {
280
        $phpFinder = new PhpExecutableFinder();
281
        if (!$phpPath = $phpFinder->find()) {
282
            throw new \RuntimeException('The php executable could not be found, add it to your PATH environment variable and try again');
283
        }
284
285
        $php = escapeshellarg($phpPath);
286
        $console = escapeshellarg(file_exists('bin/console') ? 'bin/console' : 'app/console');
287
        $cmd = 'ezplatform:reindex --content-ids=' . implode(',', $contentIds) . ($commit ? '' : ' --no-commit');
288
289
        return new Process($php . ' ' . $console . ' ' . $cmd, null, null, null, null);
290
    }
291
292
    /**
293
     * @return int
294
     */
295
    private function getNumberOfCPUCores()
296
    {
297
        $cores = 1;
298
        if (is_file('/proc/cpuinfo')) {
299
            // Linux (and potentially Windows with linux sub systems)
300
            $cpuinfo = file_get_contents('/proc/cpuinfo');
301
            preg_match_all('/^processor/m', $cpuinfo, $matches);
302
            $cores = count($matches[0]);
303
        } elseif (DIRECTORY_SEPARATOR === '\\') {
304
            // Windows
305
            if (($process = @popen('wmic cpu get NumberOfCores', 'rb')) !== false) {
306
                fgets($process);
307
                $cores = (int) fgets($process);
308
                pclose($process);
309
            }
310
        } elseif (($process = @popen('sysctl -a', 'rb')) !== false) {
311
            // *nix (Linux, BSD and Mac)
312
            $output = stream_get_contents($process);
313
            if (preg_match('/hw.ncpu: (\d+)/', $output, $matches)) {
314
                $cores = (int) $matches[1][0];
315
            }
316
            pclose($process);
317
        }
318
319
        return $cores;
320
    }
321
}
322