Completed
Push — non_purge_indexer ( 7d17c7...81b184 )
by André
12:56
created

ReindexCommand::runParallelProcess()   C

Complexity

Conditions 8
Paths 12

Size

Total Lines 36
Code Lines 19

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 8
eloc 19
nc 12
nop 5
dl 0
loc 36
rs 5.3846
c 0
b 0
f 0
1
<?php
2
3
/**
4
 * This file is part of the eZ Publish Kernel package.
5
 *
6
 * @copyright Copyright (C) eZ Systems AS. All rights reserved.
7
 * @license For full copyright and license information view LICENSE file distributed with this source code.
8
 */
9
namespace eZ\Bundle\EzPublishCoreBundle\Command;
10
11
use eZ\Publish\Core\Search\Common\IterativelyIndexer;
12
use eZ\Publish\SPI\Persistence\Content\ContentInfo;
13
use eZ\Publish\Core\Search\Common\Indexer;
14
use Doctrine\DBAL\Driver\Statement;
15
use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand;
16
use Symfony\Component\Console\Helper\ProgressBar;
17
use Symfony\Component\Console\Input\InputInterface;
18
use Symfony\Component\Console\Input\InputOption;
19
use Symfony\Component\Console\Output\OutputInterface;
20
use Symfony\Component\Process\Process;
21
use Symfony\Component\Process\PhpExecutableFinder;
22
use RuntimeException;
23
use DateTime;
24
use PDO;
25
26
class ReindexCommand extends ContainerAwareCommand
27
{
28
    /**
29
     * @var \eZ\Publish\Core\Search\Common\Indexer|\eZ\Publish\Core\Search\Common\IterativelyIndexer
30
     */
31
    private $searchIndexer;
32
33
    /**
34
     * @var \Doctrine\DBAL\Connection
35
     */
36
    private $connection;
37
38
    /**
39
     * Initialize objects required by {@see execute()}.
40
     *
41
     * @param InputInterface $input
42
     * @param OutputInterface $output
43
     */
44
    public function initialize(InputInterface $input, OutputInterface $output)
45
    {
46
        parent::initialize($input, $output);
47
        $this->searchIndexer = $this->getContainer()->get('ezpublish.spi.search.indexer');
48
        $this->connection = $this->getContainer()->get('ezpublish.api.storage_engine.legacy.connection');
49
        if (!$this->searchIndexer instanceof Indexer) {
50
            throw new RuntimeException(
51
                sprintf('Expected to find Search Engine Indexer but found "%s" instead', get_parent_class($this->searchIndexer))
52
            );
53
        }
54
    }
55
56
    /**
57
     * {@inheritdoc}
58
     */
59
    protected function configure()
60
    {
61
        $cpuCores = $this->getNumberOfCPUCores();
62
        $this
63
            ->setName('ezplatform:reindex')
64
            ->setDescription('Recreate or Refresh search engine index')
65
            ->addOption('iteration-count', 'c', InputOption::VALUE_OPTIONAL, 'Number of objects to be indexed in a single iteration, for avoiding using to much memory', 50)
66
            ->addOption('no-commit', null, InputOption::VALUE_NONE, 'Do not commit after each iteration')
67
            ->addOption('no-purge', null, InputOption::VALUE_NONE, 'Do not purge before indexing. BC NOTE: Should this be default as of 2.0?')
68
            ->addOption('since', null, InputOption::VALUE_OPTIONAL, 'Index changes since a given time, any format understood by DateTime. Implies "no-purge", can not be combined with "content-ids".')
69
            ->addOption('content-ids', null, InputOption::VALUE_IS_ARRAY | InputOption::VALUE_OPTIONAL, 'Comma separated list of content id\'s to refresh (deleted or updated/added). Implies "no-purge", can not be combined with "since".')
70
            ->addOption('processes', null, InputOption::VALUE_OPTIONAL, "Number of sub processes to spawn in parallel, suggested number is number of CPU cores ($cpuCores) -1")
71
            ->setHelp(
72
                <<<EOT
73
The command <info>%command.name%</info> indexes current configured database in configured search engine index.
74
75
76
TODO: ADD EXAMPLES OF ADVANCE USAGE!
77
78
EOT
79
            );
80
    }
81
82
    /**
83
     * {@inheritdoc}
84
     */
85
    protected function execute(InputInterface $input, OutputInterface $output)
86
    {
87
        $commit = !$input->getOption('no-commit');
88
        $iterationCount = $input->getOption('iteration-count');
89
        if (!is_numeric($iterationCount) || (int) $iterationCount < 1) {
90
            throw new RuntimeException("'--iteration-count' option should be > 0, got '{$iterationCount}'");
91
        }
92
93
94
        if (!$this->searchIndexer instanceof IterativelyIndexer) {
95
            $output->writeln( <<<EOT
96
DEPRECATED:
97
Running indexing against an Indexer that has not been updated to use IterativelyIndexer abstract.
98
99
Options that won't be taken into account:
100
- since
101
- content-ids
102
- processes
103
- no-purge
104
EOT
105
            );
106
            $this->searchIndexer->createSearchIndex($output, (int) $iterationCount, !$commit);
107
108
            return 0;
109
        }
110
111
        $output->writeln('Re-indexing started for search engine: ' . $this->getName());
112
        $this->indexIteratively($input, $output, $iterationCount, $commit);
113
        $output->writeln('');
114
        $output->writeln('Finished re-indexing');
115
    }
116
117
    protected function indexIteratively(InputInterface $input, OutputInterface $output, $iterationCount, $commit)
118
    {
119
        if ($contentIds = $input->getOption('content-ids')) {
120
            $output->writeln('Indexing list of content id\'s');
121
            return $this->searchIndexer->updateSearchIndex($contentIds, $commit);
122
        }
123
124
125
        if ($since = $input->getOption('since')) {
126
            $stmt = $this->getStatementContentSince(new DateTime($since));
127
            $count = (int) $this->getStatementContentSince(new DateTime($since), 'count(c.id)')->fetchColumn();
128
            $purge = false;
129
        } else {
130
            $stmt = $this->getStatementContentAll();
131
            $count = (int) $this->getStatementContentAll('count(c.id)')->fetchColumn();
132
            $purge = !$input->getOption('no-purge');
133
        }
134
135
        if ($purge) {
136
            $output->writeln('Purging index before starting to re-create it');
137
            $this->searchIndexer->purge($commit);
138
            $output->writeln("Re-Creating Search Engine Index for {$count} content items..");
139
        } else {
140
            $output->writeln("Refresh Search Engine Index for {$count} content items..");
141
        }
142
143
        $iterations = ceil($count/$iterationCount);
144
        $progress = new ProgressBar($output);
145
146
        if (($processCount = (int) $input->getOption('processes')) > 1 && $processCount <= $iterations) {
147
            $output->writeln("Parallel indexing using $processCount processes sharing $iterations iterations");
148
            $progress->start($iterations);
149
            return $this->runParallelProcess($progress, $stmt, (int) $processCount, (int) $iterationCount, $commit);
150
        }
151
152
        // if we only have one process, or less iterations to warrant running several, we index it all inline
153
        $progress->start($iterations);
154
        foreach ($this->fetchIteration($stmt, $iterationCount) as $contentIds) {
155
            $this->searchIndexer->updateSearchIndex($contentIds, $commit);
156
            $progress->advance(1);
157
        }
158
159
        $progress->finish();
160
    }
161
162
163
    private function runParallelProcess(ProgressBar $progress, Statement $stmt, $processCount, $iterationCount, $commit)
164
    {
165
        /**
166
         * @var \Symfony\Component\Process\Process[]|null[] $processes
167
         */
168
        $processes = array_fill(0, $processCount, null);
169
        $generator = $this->fetchIteration($stmt, $iterationCount);
170
        do {
171
            foreach ($processes as $key => $process) {
172
                if ($process !== null && $process->isRunning()) {
173
                    continue;
174
                }
175
176
                if ($process !== null) {
177
                    // One of the processes just finished, so we increment progress bar
178
                    $progress->advance(1);
179
                }
180
181
                if (!$generator->valid()) {
182
                    unset($processes[$key]);
183
                    continue;
184
                }
185
186
                $processes[$key] = $this->getPhpProcess($generator->current(), $commit);
187
                $processes[$key]->start();
188
                $generator->next();
189
            }
190
191
            if (!empty($process)) {
192
                sleep(1);
193
            }
194
195
        } while (!empty($process));
196
197
        $progress->finish();
198
    }
199
200
    /**
201
     * @param DateTime $since
202
     * @param string $select Field to select, one of  'c.id' and 'count(c.id)'
203
     *
204
     * @return \Doctrine\DBAL\Driver\Statement
205
     */
206
    private function getStatementContentSince(DateTime $since, $select = 'c.id')
207
    {
208
        $q = $this->connection->createQueryBuilder()
209
            ->select($select)
210
            ->from('ezcontentobject', 'c')
211
            ->where('c.status = :status')->andWhere('c.modified >= :since')
212
            ->orderBy('c.modified', true)
213
            ->setParameter('status', ContentInfo::STATUS_PUBLISHED, PDO::PARAM_INT)
214
            ->setParameter('since', $since->getTimestamp(), PDO::PARAM_INT);
215
216
        return $q->execute();
217
    }
218
219
    /**
220
     * @param string $select Field to select, one of  'c.id' and 'count(c.id)'
221
     *
222
     * @return \Doctrine\DBAL\Driver\Statement
223
     */
224
    private function getStatementContentAll($select = 'c.id')
225
    {
226
        $q = $this->connection->createQueryBuilder()
227
            ->select($select)
228
            ->from('ezcontentobject', 'c')
229
            ->where('c.status = :status')
230
            ->setParameter('status', ContentInfo::STATUS_PUBLISHED, PDO::PARAM_INT);
231
232
        return $q->execute();
233
    }
234
235
    /**
236
     * @param \Doctrine\DBAL\Driver\Statement $stmt
237
     * @param int $iterationCount
238
     *
239
     * @return \Generator Return an array of arrays, each array contains content id's of $iterationCount.
240
     */
241
    private function fetchIteration(Statement $stmt, $iterationCount)
242
    {
243
        do {
244
            $contentIds = [];
245 View Code Duplication
            for ($i = 0; $i < $iterationCount; ++$i) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
246
                if ($contentId = $stmt->fetch(PDO::FETCH_COLUMN)) {
247
                    $contentIds[] = $contentId;
248
                } else {
249
                    break;
250
                }
251
            }
252
253
            yield $contentIds;
254
        } while (!empty($contentId));
255
    }
256
257
    /**
258
     * @param array $contentIds
259
     * @param bool $commit
260
     *
261
     * @return \Symfony\Component\Process\Process
262
     */
263
    private static function getPhpProcess(array $contentIds, $commit)
264
    {
265
        $phpFinder = new PhpExecutableFinder();
266
        if (!$phpPath = $phpFinder->find()) {
267
            throw new \RuntimeException('The php executable could not be found, add it to your PATH environment variable and try again');
268
        }
269
270
        $php = escapeshellarg($phpPath);
271
        $console = escapeshellarg(file_exists('bin/console') ? 'bin/console' : 'app/console');
272
        $cmd = 'ezplatform:reindex --content-ids=' . implode(',', $contentIds). ($commit ? '' : ' --no-commit');
273
274
        return new Process($php.' '.$console.' '.$cmd, null, null, null, null);
275
    }
276
277
    /**
278
     * @return int
279
     */
280
    private function getNumberOfCPUCores()
281
    {
282
        $cores = 1;
283
        if (is_file('/proc/cpuinfo')) {
284
            // Linux (and potentially Windows with linux sub systems)
285
            $cpuinfo = file_get_contents('/proc/cpuinfo');
286
            preg_match_all('/^processor/m', $cpuinfo, $matches);
287
            $cores = count($matches[0]);
288
        } else if (DIRECTORY_SEPARATOR === '\\') {
289
            // Windows
290
            if (($process = @popen('wmic cpu get NumberOfCores', 'rb')) !== false) {
291
                fgets($process);
292
                $cores = (int) fgets($process);
293
                pclose($process);
294
            }
295
        } else if (($process = @popen('sysctl -a', 'rb')) !== false) {
296
            // *nix (Linux, BSD and Mac)
297
            $output = stream_get_contents($process);
298
            if (preg_match('/hw.ncpu: (\d+)/', $output, $matches)) {
299
                $cores = (int) $matches[1][0];
300
            }
301
            pclose($process);
302
        }
303
304
        return $cores;
305
    }
306
}
307