Completed
Push — non_purge_indexer ( 31b501...720488 )
by André
12:42
created

ReindexCommand::executeParallel()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 14
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 8
nc 2
nop 4
dl 0
loc 14
rs 9.4285
c 0
b 0
f 0
1
<?php
2
3
/**
4
 * This file is part of the eZ Publish Kernel package.
5
 *
6
 * @copyright Copyright (C) eZ Systems AS. All rights reserved.
7
 * @license For full copyright and license information view LICENSE file distributed with this source code.
8
 */
9
namespace eZ\Bundle\EzPublishCoreBundle\Command;
10
11
use eZ\Publish\Core\Search\Common\IterativelyIndexer;
12
use eZ\Publish\SPI\Persistence\Content\ContentInfo;
13
use eZ\Publish\Core\Search\Common\Indexer;
14
use Doctrine\DBAL\Driver\Statement;
15
use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand;
16
use Symfony\Component\Console\Helper\ProgressBar;
17
use Symfony\Component\Console\Input\InputInterface;
18
use Symfony\Component\Console\Input\InputOption;
19
use Symfony\Component\Console\Output\OutputInterface;
20
use Symfony\Component\Process\Process;
21
use Symfony\Component\Process\PhpExecutableFinder;
22
use RuntimeException;
23
use DateTime;
24
use PDO;
25
26
class ReindexCommand extends ContainerAwareCommand
27
{
28
    /**
29
     * @var \eZ\Publish\Core\Search\Common\Indexer|\eZ\Publish\Core\Search\Common\IterativelyIndexer
30
     */
31
    private $searchIndexer;
32
33
    /**
34
     * @var \Doctrine\DBAL\Connection
35
     */
36
    private $connection;
37
38
    /**
39
     * Initialize objects required by {@see execute()}.
40
     *
41
     * @param InputInterface $input
42
     * @param OutputInterface $output
43
     */
44
    public function initialize(InputInterface $input, OutputInterface $output)
45
    {
46
        parent::initialize($input, $output);
47
        $this->searchIndexer = $this->getContainer()->get('ezpublish.spi.search.indexer');
48
        $this->connection = $this->getContainer()->get('ezpublish.api.storage_engine.legacy.connection');
49
        if (!$this->searchIndexer instanceof Indexer) {
50
            throw new RuntimeException(
51
                sprintf('Expected to find Search Engine Indexer but found "%s" instead', get_parent_class($this->searchIndexer))
52
            );
53
        }
54
    }
55
56
    /**
57
     * {@inheritdoc}
58
     */
59
    protected function configure()
60
    {
61
        $this
62
            ->setName('ezplatform:reindex')
63
            ->setDescription('Recreate or Refresh search engine index')
64
            ->addOption('iteration-count', 'c', InputOption::VALUE_OPTIONAL, 'Number of objects to be indexed in a single iteration, for avoiding using to much memory', 50)
65
            ->addOption('no-commit', null, InputOption::VALUE_NONE, 'Do not commit after each iteration')
66
            ->addOption('no-purge', null, InputOption::VALUE_NONE, 'Do not purge before indexing. BC NOTE: Should this be default as of 2.0?')
67
            ->addOption('since', null, InputOption::VALUE_NONE, 'Index changes since a given time, any format understood by DateTime. Implies "no-purge", can not be combined with "content-ids".')
68
            ->addOption('content-ids', null, InputOption::VALUE_NONE, 'Comma separated list of content id\'s to refresh (deleted or updated/added). Implies "no-purge", can not be combined with "since".')
69
            ->addOption('processes', null, InputOption::VALUE_NONE, 'Number of sub processes to spawn in parallel, by default: (number of cpu cores)-1, disable by setting to "0"', $this->getNumberOfCPUCores() -1)
70
            ->setHelp(
71
                <<<EOT
72
The command <info>%command.name%</info> indexes current configured database in configured search engine index.
73
74
75
TODO: ADD EXAMPLES OF ADVANCE USAGE!
76
77
EOT
78
            );
79
    }
80
81
    /**
82
     * {@inheritdoc}
83
     */
84
    protected function execute(InputInterface $input, OutputInterface $output)
85
    {
86
        $commit = !$input->getOption('no-commit');
87
        $iterationCount = $input->getOption('iteration-count');
88
        if (!is_numeric($iterationCount) || (int) $iterationCount < 1) {
89
            throw new RuntimeException("'--iteration-count' option should be > 0, got '{$iterationCount}'");
90
        }
91
92
93
        if (!$this->searchIndexer instanceof IterativelyIndexer) {
94
            $output->writeln( <<<EOT
95
DEPRECATED:
96
Running indexing against an Indexer that has not been updated to use IterativelyIndexer abstract.
97
98
Options that won't be taken into account:
99
- since
100
- content-ids
101
- processes
102
- no-purge
103
EOT
104
            );
105
            $this->searchIndexer->createSearchIndex($output, (int) $iterationCount, !$commit);
106
107
            return 0;
108
        }
109
110
        $output->writeln('Re-indexing started for search engine: ' . $this->getName());
111
        $this->indexIteratively($input, $output, $iterationCount, $commit);
112
        $output->writeln('');
113
        $output->writeln('Finished re-indexing');
114
    }
115
116
    protected function indexIteratively(InputInterface $input, OutputInterface $output, $iterationCount, $commit)
117
    {
118
        if ($contentIds = $input->getOption('content-ids')) {
119
            $output->writeln('Indexing list of content id\'s');
120
            return $this->searchIndexer->updateSearchIndex($contentIds, $commit);
121
        }
122
123
124
        if ($since = $input->getOption('since')) {
125
            $stmt = $this->getStatementContentSince(new DateTime($since));
126
            $count = (int) $this->getStatementContentSince(new DateTime($since), 'count(c.id)')->fetchColumn();
127
            $purge = false;
128
        } else {
129
            $stmt = $this->getStatementContentAll();
130
            $count = (int) $this->getStatementContentAll('count(c.id)')->fetchColumn();
131
            $purge = !$input->getOption('no-purge');
132
        }
133
134
        if ($purge) {
135
            $output->writeln('Purging index before starting to re-create it');
136
            $this->searchIndexer->purge($commit);
137
            $output->writeln("Re-Creating Search Engine Index for {$count} content items..");
138
        } else {
139
            $output->writeln("Refresh Search Engine Index for {$count} content items..");
140
        }
141
142
        $iterations = ceil($count/$iterationCount);
143
        $progress = new ProgressBar($output);
144
145
        if (($processCount = (int) $input->getOption('processes')) > 1 && $processCount <= $iterations) {
146
            $output->writeln("Parallel indexing using $processCount processes sharing $iterations iterations");
147
            $progress->start($iterations);
148
            return $this->runParallelProcess($progress, $stmt, (int) $processCount, (int) $iterationCount, $commit);
149
        }
150
151
        // if we only have one process, or less iterations to warrant running several, we index it all inline
152
        $progress->start($iterations);
153
        do {
154
            $contentIds = $this->fetchIteration($stmt, $iterationCount);
155
            $this->searchIndexer->updateSearchIndex($contentIds, $commit);
0 ignored issues
show
Documentation introduced by
$contentIds is of type object<Generator>, but the function expects a array<integer,integer>.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
156
            $progress->advance(1);
157
        } while (!empty($contentIds));
158
159
        $progress->finish();
160
    }
161
162
163
    private function runParallelProcess(ProgressBar $progress, Statement $stmt, $processCount, $iterationCount, $commit)
164
    {
165
        /**
166
         * @var \Symfony\Component\Process\Process[]|null[] $processes
167
         */
168
        $processes = array_fill(0, $processCount, null);
169
        $done = false;
170
        do {
171
            foreach ($processes as $key => $process) {
172
                if ($process !== null && $process->isRunning()) {
173
                    continue;
174
                }
175
176
                if ($process !== null) {
177
                    // One of the processes just finished, so we increment progress bar
178
                    $progress->advance(1);
179
                }
180
181
                $contentIds = $done ? [] : $this->fetchIteration($stmt, $iterationCount);
182
                if (empty($contentIds)) {
183
                    unset($processes[$key]);
184
                    $done = true;
185
                    continue;
186
                }
187
188
                $processes[$key] = $this->getPhpProcess($contentIds, $commit);
0 ignored issues
show
Bug introduced by
It seems like $contentIds defined by $done ? array() : $this-...$stmt, $iterationCount) on line 181 can also be of type object<Generator>; however, eZ\Bundle\EzPublishCoreB...ommand::getPhpProcess() does only seem to accept array, maybe add an additional type check?

If a method or function can return multiple different values and unless you are sure that you only can receive a single value in this context, we recommend to add an additional type check:

/**
 * @return array|string
 */
function returnsDifferentValues($x) {
    if ($x) {
        return 'foo';
    }

    return array();
}

$x = returnsDifferentValues($y);
if (is_array($x)) {
    // $x is an array.
}

If this a common case that PHP Analyzer should handle natively, please let us know by opening an issue.

Loading history...
189
                $processes[$key]->start();
190
            }
191
192
            if (!empty($process)) {
193
                sleep(1);
194
            }
195
196
        } while (!empty($process));
197
198
        $progress->finish();
199
    }
200
201
    /**
202
     * @param DateTime $since
203
     * @param string $select Field to select, one of  'c.id' and 'count(c.id)'
204
     *
205
     * @return \Doctrine\DBAL\Driver\Statement
206
     */
207
    private function getStatementContentSince(DateTime $since, $select = 'c.id')
208
    {
209
        $q = $this->connection->createQueryBuilder()
210
            ->select($select)
211
            ->from('ezcontentobject', 'c')
212
            ->where('c.status = :status')->andWhere('c.modified >= :since')
213
            ->orderBy('c.modified', true)
214
            ->setParameter('status', ContentInfo::STATUS_PUBLISHED, PDO::PARAM_INT)
215
            ->setParameter('since', $since->getTimestamp(), PDO::PARAM_INT);
216
217
        return $q->execute();
218
    }
219
220
    /**
221
     * @param string $select Field to select, one of  'c.id' and 'count(c.id)'
222
     *
223
     * @return \Doctrine\DBAL\Driver\Statement
224
     */
225
    private function getStatementContentAll($select = 'c.id')
226
    {
227
        $q = $this->connection->createQueryBuilder()
228
            ->select($select)
229
            ->from('ezcontentobject', 'c')
230
            ->where('c.status = :status')
231
            ->setParameter('status', ContentInfo::STATUS_PUBLISHED, PDO::PARAM_INT);
232
233
        return $q->execute();
234
    }
235
236
    /**
237
     * @param \Doctrine\DBAL\Driver\Statement $stmt
238
     * @param int $iterationCount
239
     *
240
     * @return int[][] Return an array of arrays, each array contains content id's of $iterationCount.
241
     */
242
    private function fetchIteration(Statement $stmt, $iterationCount)
243
    {
244
        do {
245
            $contentIds = [];
246 View Code Duplication
            for ($i = 0; $i < $iterationCount; ++$i) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
247
                if ($contentId = $stmt->fetch(PDO::FETCH_COLUMN)) {
248
                    $contentIds[] = $contentId;
249
                } else {
250
                    break;
251
                }
252
            }
253
254
            yield $contentIds;
255
        } while (!empty($contentId));
256
    }
257
258
    /**
259
     * @param array $contentIds
260
     * @param bool $commit
261
     *
262
     * @return \Symfony\Component\Process\Process
263
     */
264
    private static function getPhpProcess(array $contentIds, $commit)
265
    {
266
        $phpFinder = new PhpExecutableFinder();
267
        if (!$phpPath = $phpFinder->find()) {
268
            throw new \RuntimeException('The php executable could not be found, add it to your PATH environment variable and try again');
269
        }
270
271
        $php = escapeshellarg($phpFinder);
272
        $console = escapeshellarg(file_exists('bin/console') ? 'bin/console' : 'app/console');
273
        $cmd = 'ezplatform:reindex --content-ids=' . implode(',', $contentIds). ($commit ? '' : ' --no-commit');
274
275
        return new Process($php.' '.$console.' '.$cmd, null, null, null, null);
276
    }
277
278
    /**
279
     * @return int
280
     */
281
    private function getNumberOfCPUCores()
282
    {
283
        $cores = 1;
284
        if (is_file('/proc/cpuinfo')) {
285
            // Linux (and potentially Windows with linux sub systems)
286
            $cpuinfo = file_get_contents('/proc/cpuinfo');
287
            preg_match_all('/^processor/m', $cpuinfo, $matches);
288
            $cores = count($matches[0]);
289
        } else if (DIRECTORY_SEPARATOR === '\\') {
290
            // Windows
291
            if (($process = @popen('wmic cpu get NumberOfCores', 'rb')) !== false) {
292
                fgets($process);
293
                $cores = (int) fgets($process);
294
                pclose($process);
295
            }
296
        } else if (($process = @popen('sysctl -a', 'rb')) !== false) {
297
            // *nix (Linux, BSD and Mac)
298
            $output = stream_get_contents($process);
299
            if (preg_match('/hw.ncpu: (\d+)/', $output, $matches)) {
300
                $cores = (int) $matches[1][0];
301
            }
302
            pclose($process);
303
        }
304
305
        return $cores;
306
    }
307
}
308