Completed
Push — master ( 375d71...d02578 )
by André
75:03 queued 59:36
created

ReindexCommand::runParallelProcess()   D

Complexity

Conditions 9
Paths 16

Size

Total Lines 37
Code Lines 20

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 9
eloc 20
nc 16
nop 5
dl 0
loc 37
rs 4.909
c 0
b 0
f 0
1
<?php
2
3
/**
4
 * This file is part of the eZ Publish Kernel package.
5
 *
6
 * @copyright Copyright (C) eZ Systems AS. All rights reserved.
7
 * @license For full copyright and license information view LICENSE file distributed with this source code.
8
 */
9
namespace eZ\Bundle\EzPublishCoreBundle\Command;
10
11
use eZ\Publish\SPI\Persistence\Content\ContentInfo;
12
use eZ\Publish\Core\Search\Common\Indexer;
13
use eZ\Publish\Core\Search\Common\IncrementalIndexer;
14
use Doctrine\DBAL\Driver\Statement;
15
use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand;
16
use Symfony\Component\Console\Helper\ProgressBar;
17
use Symfony\Component\Console\Input\InputInterface;
18
use Symfony\Component\Console\Input\InputOption;
19
use Symfony\Component\Console\Output\OutputInterface;
20
use Symfony\Component\Process\PhpExecutableFinder;
21
use Symfony\Component\Process\ProcessBuilder;
22
use RuntimeException;
23
use DateTime;
24
use PDO;
25
26
class ReindexCommand extends ContainerAwareCommand
27
{
28
    /**
29
     * @var \eZ\Publish\Core\Search\Common\Indexer|\eZ\Publish\Core\Search\Common\IncrementalIndexer
30
     */
31
    private $searchIndexer;
32
33
    /**
34
     * @var \Doctrine\DBAL\Connection
35
     */
36
    private $connection;
37
38
    /**
39
     * @var string
40
     */
41
    private $phpPath;
42
43
    /**
44
     * @var \Psr\Log\LoggerInterface
45
     */
46
    private $logger;
47
48
    /**
49
     * Initialize objects required by {@see execute()}.
50
     *
51
     * @param InputInterface $input
52
     * @param OutputInterface $output
53
     */
54
    public function initialize(InputInterface $input, OutputInterface $output)
55
    {
56
        parent::initialize($input, $output);
57
        $this->searchIndexer = $this->getContainer()->get('ezpublish.spi.search.indexer');
58
        $this->connection = $this->getContainer()->get('ezpublish.api.storage_engine.legacy.connection');
59
        $this->logger = $this->getContainer()->get('logger');
60
        if (!$this->searchIndexer instanceof Indexer) {
61
            throw new RuntimeException(
62
                sprintf(
63
                    'Expected to find Search Engine Indexer but found "%s" instead',
64
                    get_parent_class($this->searchIndexer)
65
                )
66
            );
67
        }
68
    }
69
70
    /**
71
     * {@inheritdoc}
72
     */
73
    protected function configure()
74
    {
75
        $this
76
            ->setName('ezplatform:reindex')
77
            ->setDescription('Recreate or Refresh search engine index')
78
            ->addOption(
79
                'iteration-count',
80
                'c',
81
                InputOption::VALUE_OPTIONAL,
82
                'Number of objects to be indexed in a single iteration, for avoiding using too much memory',
83
                50
84
            )->addOption(
85
                'no-commit',
86
                null,
87
                InputOption::VALUE_NONE,
88
                'Do not commit after each iteration'
89
            )->addOption(
90
                'no-purge',
91
                null,
92
                InputOption::VALUE_NONE,
93
                'Do not purge before indexing'
94
            )->addOption(
95
                'since',
96
                null,
97
                InputOption::VALUE_OPTIONAL,
98
                'Refresh changes since a given time, any format understood by DateTime. Implies "no-purge", can not be combined with "content-ids" or "subtree"'
99
            )->addOption(
100
                'content-ids',
101
                null,
102
                InputOption::VALUE_OPTIONAL,
103
                'Comma separated list of content id\'s to refresh (deleted/updated/added). Implies "no-purge", can not be combined with "since" or "subtree"'
104
            )->addOption(
105
                'subtree',
106
                null,
107
                InputOption::VALUE_OPTIONAL,
108
                'Location Id to index subtree of (incl self). Implies "no-purge", can not be combined with "since" or "content-ids"'
109
            )->addOption(
110
                'processes',
111
                null,
112
                InputOption::VALUE_OPTIONAL,
113
                'Number of child processes to run in parallel for iterations, if set to "auto" it will set to number of CPU cores -1, set to "1" or "0" to disable',
114
                'auto'
115
            )->setHelp(
116
                <<<EOT
117
The command <info>%command.name%</info> indexes current configured database in configured search engine index.
118
119
120
Example usage:
121
- Refresh (add/update) index changes since yesterday:
122
  <comment>ezplatform:reindex --since=yesterday</comment>
123
  See: http://php.net/manual/en/datetime.formats.php
124
125
- Refresh (add/update/remove) index on a set of content id's:
126
  <comment>ezplatform:reindex --content-ids=2,34,68</comment>
127
128
- Refresh (add/update) index of a subtree:
129
  <comment>ezplatform:reindex --subtree=45</comment>
130
131
- Refresh (add/update) index disabling use of child proccesses and initial purging,
132
  & let search engine handle commits using auto commit:
133
  <comment>ezplatform:reindex --no-purge --no-commit --processes=0</comment>
134
135
EOT
136
            );
137
    }
138
139
    /**
140
     * {@inheritdoc}
141
     */
142
    protected function execute(InputInterface $input, OutputInterface $output)
143
    {
144
        $commit = !$input->getOption('no-commit');
145
        $iterationCount = $input->getOption('iteration-count');
146
        if (!is_numeric($iterationCount) || (int) $iterationCount < 1) {
147
            throw new RuntimeException("'--iteration-count' option should be > 0, got '{$iterationCount}'");
148
        }
149
150
        if (!$this->searchIndexer instanceof IncrementalIndexer) {
151
            $output->writeln(<<<EOT
152
DEPRECATED:
153
Running indexing against an Indexer that has not been updated to use IncrementalIndexer abstract.
154
155
Options that won't be taken into account:
156
- since
157
- content-ids
158
- subtree
159
- processes
160
- no-purge
161
EOT
162
            );
163
            $this->searchIndexer->createSearchIndex($output, (int) $iterationCount, !$commit);
164
        } else {
165
            $output->writeln('Re-indexing started for search engine: ' . $this->searchIndexer->getName());
166
            $output->writeln('');
167
168
            $return = $this->indexIncrementally($input, $output, $iterationCount, $commit);
169
170
            $output->writeln('');
171
            $output->writeln('Finished re-indexing');
172
173
            return $return;
174
        }
175
    }
176
177
    protected function indexIncrementally(InputInterface $input, OutputInterface $output, $iterationCount, $commit)
178
    {
179
        if ($contentIds = $input->getOption('content-ids')) {
180
            $contentIds = explode(',', $contentIds);
181
            $output->writeln(sprintf(
182
                'Indexing list of content id\'s (%s)' . ($commit ? ', with commit' : ''),
183
                count($contentIds)
184
            ));
185
186
            return $this->searchIndexer->updateSearchIndex($contentIds, $commit);
187
        }
188
189
        if ($since = $input->getOption('since')) {
190
            $stmt = $this->getStatementContentSince(new DateTime($since));
191
            $count = (int)$this->getStatementContentSince(new DateTime($since), true)->fetchColumn();
192
            $purge = false;
193
        } elseif ($locationId = (int) $input->getOption('subtree')) {
194
            $stmt = $this->getStatementSubtree($locationId);
195
            $count = (int) $this->getStatementSubtree($locationId, true)->fetchColumn();
196
            $purge = false;
197
        } else {
198
            $stmt = $this->getStatementContentAll();
199
            $count = (int) $this->getStatementContentAll(true)->fetchColumn();
200
            $purge = !$input->getOption('no-purge');
201
        }
202
203
        if (!$count) {
204
            $output->writeln('<error>Could not find any items to index, aborting.</error>');
205
206
            return 1;
207
        }
208
209
        $iterations = ceil($count / $iterationCount);
210
        $processes = $input->getOption('processes');
211
        $processCount = $processes === 'auto' ? $this->getNumberOfCPUCores() - 1 : (int) $processes;
212
        $processCount = min($iterations, $processCount);
213
        $processMessage = $processCount > 1 ? "using $processCount parallel child processes" : 'using single (current) process';
214
215
        if ($purge) {
216
            $output->writeln('Purging index...');
217
            $this->searchIndexer->purge();
218
219
            $output->writeln(
220
                "<info>Re-Creating index for {$count} items across $iterations iteration(s), $processMessage:</info>"
221
            );
222
        } else {
223
            $output->writeln(
224
                "<info>Refreshing index for {$count} items across $iterations iteration(s), $processMessage:</info>"
225
            );
226
        }
227
228
        $progress = new ProgressBar($output);
229
        $progress->start($iterations);
230
231
        if ($processCount > 1) {
232
            $this->runParallelProcess($progress, $stmt, (int) $processCount, (int) $iterationCount, $commit);
233
        } else {
234
            // if we only have one process, or less iterations to warrant running several, we index it all inline
235
            foreach ($this->fetchIteration($stmt, $iterationCount) as $contentIds) {
236
                $this->searchIndexer->updateSearchIndex($contentIds, $commit);
237
                $progress->advance(1);
238
            }
239
        }
240
241
        $progress->finish();
242
    }
243
244
    private function runParallelProcess(ProgressBar $progress, Statement $stmt, $processCount, $iterationCount, $commit)
245
    {
246
        /**
247
         * @var \Symfony\Component\Process\Process[]|null[]
248
         */
249
        $processes = array_fill(0, $processCount, null);
250
        $generator = $this->fetchIteration($stmt, $iterationCount);
251
        do {
252
            foreach ($processes as $key => $process) {
253
                if ($process !== null && $process->isRunning()) {
254
                    continue;
255
                }
256
257
                if ($process !== null) {
258
                    // One of the processes just finished, so we increment progress bar
259
                    $progress->advance(1);
260
261
                    if (!$process->isSuccessful()) {
262
                        $this->logger->error('Child indexer process returned: ' . $process->getExitCodeText());
263
                    }
264
                }
265
266
                if (!$generator->valid()) {
267
                    unset($processes[$key]);
268
                    continue;
269
                }
270
271
                $processes[$key] = $this->getPhpProcess($generator->current(), $commit);
272
                $processes[$key]->start();
273
                $generator->next();
274
            }
275
276
            if (!empty($processes)) {
277
                sleep(1);
278
            }
279
        } while (!empty($processes));
280
    }
281
282
    /**
283
     * @param DateTime $since
284
     * @param bool $count
285
     *
286
     * @return \Doctrine\DBAL\Driver\Statement
287
     */
288
    private function getStatementContentSince(DateTime $since, $count = false)
289
    {
290
        $q = $this->connection->createQueryBuilder()
291
            ->select($count ? 'count(c.id)' : 'c.id')
292
            ->from('ezcontentobject', 'c')
293
            ->where('c.status = :status')->andWhere('c.modified >= :since')
294
            ->orderBy('c.modified')
295
            ->setParameter('status', ContentInfo::STATUS_PUBLISHED, PDO::PARAM_INT)
296
            ->setParameter('since', $since->getTimestamp(), PDO::PARAM_INT);
297
298
        return $q->execute();
299
    }
300
301
    /**
302
     * @param mixed $locationId
303
     * @param bool $count
304
     *
305
     * @return \Doctrine\DBAL\Driver\Statement
306
     */
307
    private function getStatementSubtree($locationId, $count = false)
308
    {
309
        /**
310
         * @var \eZ\Publish\SPI\Persistence\Content\Location\Handler
311
         */
312
        $locationHandler = $this->getContainer()->get('ezpublish.spi.persistence.location_handler');
313
        $location = $locationHandler->load($locationId);
314
        $q = $this->connection->createQueryBuilder()
315
            ->select($count ? 'count(DISTINCT c.id)' : 'DISTINCT c.id')
316
            ->from('ezcontentobject', 'c')
317
            ->innerJoin('c', 'ezcontentobject_tree', 't', 't.contentobject_id = c.id')
318
            ->where('c.status = :status')
319
            ->andWhere('t.path_string LIKE :path')
320
            ->setParameter('status', ContentInfo::STATUS_PUBLISHED, PDO::PARAM_INT)
321
            ->setParameter('path', $location->pathString . '%', PDO::PARAM_STR);
322
323
        return $q->execute();
324
    }
325
326
    /**
327
     * @param bool $count
328
     *
329
     * @return \Doctrine\DBAL\Driver\Statement
330
     */
331
    private function getStatementContentAll($count = false)
332
    {
333
        $q = $this->connection->createQueryBuilder()
334
            ->select($count ? 'count(c.id)' : 'c.id')
335
            ->from('ezcontentobject', 'c')
336
            ->where('c.status = :status')
337
            ->setParameter('status', ContentInfo::STATUS_PUBLISHED, PDO::PARAM_INT);
338
339
        return $q->execute();
340
    }
341
342
    /**
343
     * @param \Doctrine\DBAL\Driver\Statement $stmt
344
     * @param int $iterationCount
345
     *
346
     * @return \Generator Return an array of arrays, each array contains content id's of $iterationCount.
347
     */
348
    private function fetchIteration(Statement $stmt, $iterationCount)
349
    {
350
        do {
351
            $contentIds = [];
352 View Code Duplication
            for ($i = 0; $i < $iterationCount; ++$i) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
353
                if ($contentId = $stmt->fetch(PDO::FETCH_COLUMN)) {
354
                    $contentIds[] = $contentId;
355
                } else {
356
                    break;
357
                }
358
            }
359
360
            yield $contentIds;
361
        } while (!empty($contentId));
362
    }
363
364
    /**
365
     * @param array $contentIds
366
     * @param bool $commit
367
     *
368
     * @return \Symfony\Component\Process\Process
369
     */
370
    private function getPhpProcess(array $contentIds, $commit)
371
    {
372
        $process = new ProcessBuilder([
373
            file_exists('bin/console') ? 'bin/console' : 'app/console',
374
            'ezplatform:reindex',
375
            '--content-ids=' . implode(',', $contentIds),
376
        ]);
377
        $process->setTimeout(null);
378
        $process->setPrefix($this->getPhpPath());
379
380
        if (!$commit) {
381
            $process->add('--no-commit');
382
        }
383
384
        return $process->getProcess();
385
    }
386
387
    /**
388
     * @return string
389
     */
390
    private function getPhpPath()
391
    {
392
        if ($this->phpPath) {
393
            return $this->phpPath;
394
        }
395
396
        $phpFinder = new PhpExecutableFinder();
397
        $this->phpPath = $phpFinder->find();
0 ignored issues
show
Documentation Bug introduced by
It seems like $phpFinder->find() can also be of type false. However, the property $phpPath is declared as type string. Maybe add an additional type check?

Our type inference engine has found a suspicous assignment of a value to a property. This check raises an issue when a value that can be of a mixed type is assigned to a property that is type hinted more strictly.

For example, imagine you have a variable $accountId that can either hold an Id object or false (if there is no account id yet). Your code now assigns that value to the id property of an instance of the Account class. This class holds a proper account, so the id value must no longer be false.

Either this assignment is in error or a type check should be added for that assignment.

class Id
{
    public $id;

    public function __construct($id)
    {
        $this->id = $id;
    }

}

class Account
{
    /** @var  Id $id */
    public $id;
}

$account_id = false;

if (starsAreRight()) {
    $account_id = new Id(42);
}

$account = new Account();
if ($account instanceof Id)
{
    $account->id = $account_id;
}
Loading history...
398
        if (!$this->phpPath) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $this->phpPath of type string|false is loosely compared to false; this is ambiguous if the string can be empty. You might want to explicitly use === false instead.

In PHP, under loose comparison (like ==, or !=, or switch conditions), values of different types might be equal.

For string values, the empty string '' is a special case, in particular the following results might be unexpected:

''   == false // true
''   == null  // true
'ab' == false // false
'ab' == null  // false

// It is often better to use strict comparison
'' === false // false
'' === null  // false
Loading history...
399
            throw new \RuntimeException(
400
                'The php executable could not be found, it\'s needed for executing parable sub processes, so add it to your PATH environment variable and try again'
401
            );
402
        }
403
404
        return $this->phpPath;
405
    }
406
407
    /**
408
     * @return int
409
     */
410
    private function getNumberOfCPUCores()
411
    {
412
        $cores = 1;
413
        if (is_file('/proc/cpuinfo')) {
414
            // Linux (and potentially Windows with linux sub systems)
415
            $cpuinfo = file_get_contents('/proc/cpuinfo');
416
            preg_match_all('/^processor/m', $cpuinfo, $matches);
417
            $cores = count($matches[0]);
418
        } elseif (DIRECTORY_SEPARATOR === '\\') {
419
            // Windows
420
            if (($process = @popen('wmic cpu get NumberOfCores', 'rb')) !== false) {
421
                fgets($process);
422
                $cores = (int) fgets($process);
423
                pclose($process);
424
            }
425
        } elseif (($process = @popen('sysctl -a', 'rb')) !== false) {
426
            // *nix (Linux, BSD and Mac)
427
            $output = stream_get_contents($process);
428
            if (preg_match('/hw.ncpu: (\d+)/', $output, $matches)) {
429
                $cores = (int) $matches[1][0];
430
            }
431
            pclose($process);
432
        }
433
434
        return $cores;
435
    }
436
}
437