Completed
Push — EZP-28176-return ( 6687e2 )
by André
12:16
created

ReindexCommand::runParallelProcess()   D

Complexity

Conditions 9
Paths 16

Size

Total Lines 37
Code Lines 20

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 9
eloc 20
nc 16
nop 5
dl 0
loc 37
rs 4.909
c 0
b 0
f 0
1
<?php
2
3
/**
4
 * This file is part of the eZ Publish Kernel package.
5
 *
6
 * @copyright Copyright (C) eZ Systems AS. All rights reserved.
7
 * @license For full copyright and license information view LICENSE file distributed with this source code.
8
 */
9
namespace eZ\Bundle\EzPublishCoreBundle\Command;
10
11
use eZ\Publish\SPI\Persistence\Content\ContentInfo;
12
use eZ\Publish\Core\Search\Common\Indexer;
13
use eZ\Publish\Core\Search\Common\IncrementalIndexer;
14
use Doctrine\DBAL\Driver\Statement;
15
use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand;
16
use Symfony\Component\Console\Helper\ProgressBar;
17
use Symfony\Component\Console\Input\InputInterface;
18
use Symfony\Component\Console\Input\InputOption;
19
use Symfony\Component\Console\Output\OutputInterface;
20
use Symfony\Component\Process\PhpExecutableFinder;
21
use Symfony\Component\Process\ProcessBuilder;
22
use RuntimeException;
23
use DateTime;
24
use PDO;
25
26
class ReindexCommand extends ContainerAwareCommand
27
{
28
    /**
29
     * @var \eZ\Publish\Core\Search\Common\Indexer|\eZ\Publish\Core\Search\Common\IncrementalIndexer
30
     */
31
    private $searchIndexer;
32
33
    /**
34
     * @var \Doctrine\DBAL\Connection
35
     */
36
    private $connection;
37
38
    /**
39
     * @var string
40
     */
41
    private $phpPath;
42
43
    /**
44
     * @var \Psr\Log\LoggerInterface
45
     */
46
    private $logger;
47
48
    /**
49
     * Initialize objects required by {@see execute()}.
50
     *
51
     * @param InputInterface $input
52
     * @param OutputInterface $output
53
     */
54
    public function initialize(InputInterface $input, OutputInterface $output)
55
    {
56
        parent::initialize($input, $output);
57
        $this->searchIndexer = $this->getContainer()->get('ezpublish.spi.search.indexer');
58
        $this->connection = $this->getContainer()->get('ezpublish.api.storage_engine.legacy.connection');
59
        $this->logger = $this->getContainer()->get('logger');
60
        if (!$this->searchIndexer instanceof Indexer) {
61
            throw new RuntimeException(
62
                sprintf(
63
                    'Expected to find Search Engine Indexer but found "%s" instead',
64
                    get_parent_class($this->searchIndexer)
65
                )
66
            );
67
        }
68
    }
69
70
    /**
71
     * {@inheritdoc}
72
     */
73
    protected function configure()
74
    {
75
        $this
76
            ->setName('ezplatform:reindex')
77
            ->setDescription('Recreate or Refresh search engine index')
78
            ->addOption(
79
                'iteration-count',
80
                'c',
81
                InputOption::VALUE_OPTIONAL,
82
                'Number of objects to be indexed in a single iteration, for avoiding using too much memory',
83
                50
84
            )->addOption(
85
                'no-commit',
86
                null,
87
                InputOption::VALUE_NONE,
88
                'Do not commit after each iteration'
89
            )->addOption(
90
                'no-purge',
91
                null,
92
                InputOption::VALUE_NONE,
93
                'Do not purge before indexing'
94
            )->addOption(
95
                'since',
96
                null,
97
                InputOption::VALUE_OPTIONAL,
98
                'Refresh changes since a given time, any format understood by DateTime. Implies "no-purge", can not be combined with "content-ids" or "subtree"'
99
            )->addOption(
100
                'content-ids',
101
                null,
102
                InputOption::VALUE_OPTIONAL,
103
                'Comma separated list of content id\'s to refresh (deleted/updated/added). Implies "no-purge", can not be combined with "since" or "subtree"'
104
            )->addOption(
105
                'subtree',
106
                null,
107
                InputOption::VALUE_OPTIONAL,
108
                'Location Id to index subtree of (incl self). Implies "no-purge", can not be combined with "since" or "content-ids"'
109
            )->addOption(
110
                'processes',
111
                null,
112
                InputOption::VALUE_OPTIONAL,
113
                'Number of sub processes to spawn in parallel handling iterations, default number is number of CPU cores -1, set to 1 or 0 to disable',
114
                $this->getNumberOfCPUCores()
115
            )->setHelp(
116
                <<<EOT
117
The command <info>%command.name%</info> indexes current configured database in configured search engine index.
118
119
120
Example usage:
121
- Refresh (add/update) index changes since yesterday:
122
  <comment>ezplatform:reindex --since=yesterday</comment>
123
  See: http://php.net/manual/en/datetime.formats.php
124
125
- Refresh (add/update/remove) index on a set of content id's:
126
  <comment>ezplatform:reindex --content-ids=2,34,68</comment>
127
128
- Refresh (add/update) index of a subtree:
129
  <comment>ezplatform:reindex --subtree=45</comment>
130
131
 - Refresh (add/update) the whole index using 3 processes, & let search engine handle commits itself using auto commit:
132
   <comment>ezplatform:reindex --no-purge --no-commit --processes=3</comment>
133
134
EOT
135
            );
136
    }
137
138
    /**
139
     * {@inheritdoc}
140
     */
141
    protected function execute(InputInterface $input, OutputInterface $output)
142
    {
143
        $commit = !$input->getOption('no-commit');
144
        $iterationCount = $input->getOption('iteration-count');
145
        if (!is_numeric($iterationCount) || (int) $iterationCount < 1) {
146
            throw new RuntimeException("'--iteration-count' option should be > 0, got '{$iterationCount}'");
147
        }
148
149
        if (!$this->searchIndexer instanceof IncrementalIndexer) {
150
            $output->writeln(<<<EOT
151
DEPRECATED:
152
Running indexing against an Indexer that has not been updated to use IncrementalIndexer abstract.
153
154
Options that won't be taken into account:
155
- since
156
- content-ids
157
- subtree
158
- processes
159
- no-purge
160
EOT
161
            );
162
            $this->searchIndexer->createSearchIndex($output, (int) $iterationCount, !$commit);
163
        } else {
164
            $output->writeln('Re-indexing started for search engine: ' . $this->searchIndexer->getName());
165
            $output->writeln('');
166
167
            $return = $this->indexIncrementally($input, $output, $iterationCount, $commit);
168
169
            $output->writeln('');
170
            $output->writeln('Finished re-indexing');
171
172
            return $return;
173
        }
174
    }
175
176
    protected function indexIncrementally(InputInterface $input, OutputInterface $output, $iterationCount, $commit)
177
    {
178
        if ($contentIds = $input->getOption('content-ids')) {
179
            $contentIds = explode(',', $contentIds);
180
            $output->writeln(sprintf(
181
                'Indexing list of content id\'s (%s)' . $commit ? ', with commit' : '',
182
                count($contentIds)
183
            ));
184
185
            return $this->searchIndexer->updateSearchIndex($contentIds, $commit);
186
        }
187
188
        if ($since = $input->getOption('since')) {
189
            $stmt = $this->getStatementContentSince(new DateTime($since));
190
            $count = (int)$this->getStatementContentSince(new DateTime($since), true)->fetchColumn();
191
            $purge = false;
192
        } elseif ($locationId = (int) $input->getOption('subtree')) {
193
            $stmt = $this->getStatementSubtree($locationId);
194
            $count = (int) $this->getStatementSubtree($locationId, true)->fetchColumn();
195
            $purge = false;
196
        } else {
197
            $stmt = $this->getStatementContentAll();
198
            $count = (int) $this->getStatementContentAll(true)->fetchColumn();
199
            $purge = !$input->getOption('no-purge');
200
        }
201
202
        if (!$count) {
203
            $output->writeln('<error>Could not find any items to index, aborting.</error>');
204
205
            return 1;
206
        }
207
208
        $iterations = ceil($count / $iterationCount);
209
        $processCount = (int) $input->getOption('processes');
210
        $processCount = $processCount > $iterations ? $iterations : $processCount;
211
        $processMessage = $processCount > 1 ? "using $processCount parallel processes" : 'using single process';
212
213
        if ($purge) {
214
            $output->writeln('Purging index...');
215
            $this->searchIndexer->purge();
216
217
            $output->writeln(
218
                "<info>Re-Creating index for {$count} items across $iterations iteration(s), $processMessage:</info>"
219
            );
220
        } else {
221
            $output->writeln(
222
                "<info>Refreshing index for {$count} items across $iterations iteration(s), $processMessage:</info>"
223
            );
224
        }
225
226
        $progress = new ProgressBar($output);
227
        $progress->start($iterations);
228
229
        if ($processCount > 1) {
230
            $this->runParallelProcess($progress, $stmt, (int) $processCount, (int) $iterationCount, $commit);
231
        } else {
232
            // if we only have one process, or less iterations to warrant running several, we index it all inline
233
            foreach ($this->fetchIteration($stmt, $iterationCount) as $contentIds) {
234
                $this->searchIndexer->updateSearchIndex($contentIds, $commit);
235
                $progress->advance(1);
236
            }
237
        }
238
239
        $progress->finish();
240
    }
241
242
    private function runParallelProcess(ProgressBar $progress, Statement $stmt, $processCount, $iterationCount, $commit)
243
    {
244
        /**
245
         * @var \Symfony\Component\Process\Process[]|null[]
246
         */
247
        $processes = array_fill(0, $processCount, null);
248
        $generator = $this->fetchIteration($stmt, $iterationCount);
249
        do {
250
            foreach ($processes as $key => $process) {
251
                if ($process !== null && $process->isRunning()) {
252
                    continue;
253
                }
254
255
                if ($process !== null) {
256
                    // One of the processes just finished, so we increment progress bar
257
                    $progress->advance(1);
258
259
                    if (!$process->isSuccessful()) {
260
                        $this->logger->error("Child indexer process returned: " . $process->getExitCodeText());
261
                    }
262
                }
263
264
                if (!$generator->valid()) {
265
                    unset($processes[$key]);
266
                    continue;
267
                }
268
269
                $processes[$key] = $this->getPhpProcess($generator->current(), $commit);
270
                $processes[$key]->start();
271
                $generator->next();
272
            }
273
274
            if (!empty($processes)) {
275
                sleep(1);
276
            }
277
        } while (!empty($processes));
278
    }
279
280
    /**
281
     * @param DateTime $since
282
     * @param bool $count
283
     *
284
     * @return \Doctrine\DBAL\Driver\Statement
285
     */
286
    private function getStatementContentSince(DateTime $since, $count = false)
287
    {
288
        $q = $this->connection->createQueryBuilder()
289
            ->select($count ? 'count(c.id)' : 'c.id')
290
            ->from('ezcontentobject', 'c')
291
            ->where('c.status = :status')->andWhere('c.modified >= :since')
292
            ->orderBy('c.modified')
293
            ->setParameter('status', ContentInfo::STATUS_PUBLISHED, PDO::PARAM_INT)
294
            ->setParameter('since', $since->getTimestamp(), PDO::PARAM_INT);
295
296
        return $q->execute();
297
    }
298
299
    /**
300
     * @param mixed $locationId
301
     * @param bool $count
302
     *
303
     * @return \Doctrine\DBAL\Driver\Statement
304
     */
305
    private function getStatementSubtree($locationId, $count = false)
306
    {
307
        /**
308
         * @var \eZ\Publish\SPI\Persistence\Content\Location\Handler
309
         */
310
        $locationHandler = $this->getContainer()->get('ezpublish.spi.persistence.location_handler');
311
        $location = $locationHandler->load($locationId);
312
        $q = $this->connection->createQueryBuilder()
313
            ->select($count ? 'count(DISTINCT c.id)' : 'DISTINCT c.id')
314
            ->from('ezcontentobject', 'c')
315
            ->innerJoin('c', 'ezcontentobject_tree', 't', 't.contentobject_id = c.id')
316
            ->where('c.status = :status')
317
            ->andWhere('t.path_string LIKE :path')
318
            ->setParameter('status', ContentInfo::STATUS_PUBLISHED, PDO::PARAM_INT)
319
            ->setParameter('path', $location->pathString . '%', PDO::PARAM_STR);
320
321
        return $q->execute();
322
    }
323
324
    /**
325
     * @param bool $count
326
     *
327
     * @return \Doctrine\DBAL\Driver\Statement
328
     */
329
    private function getStatementContentAll($count = false)
330
    {
331
        $q = $this->connection->createQueryBuilder()
332
            ->select($count ? 'count(c.id)' : 'c.id')
333
            ->from('ezcontentobject', 'c')
334
            ->where('c.status = :status')
335
            ->setParameter('status', ContentInfo::STATUS_PUBLISHED, PDO::PARAM_INT);
336
337
        return $q->execute();
338
    }
339
340
    /**
341
     * @param \Doctrine\DBAL\Driver\Statement $stmt
342
     * @param int $iterationCount
343
     *
344
     * @return \Generator Return an array of arrays, each array contains content id's of $iterationCount.
345
     */
346
    private function fetchIteration(Statement $stmt, $iterationCount)
347
    {
348
        do {
349
            $contentIds = [];
350 View Code Duplication
            for ($i = 0; $i < $iterationCount; ++$i) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
351
                if ($contentId = $stmt->fetch(PDO::FETCH_COLUMN)) {
352
                    $contentIds[] = $contentId;
353
                } else {
354
                    break;
355
                }
356
            }
357
358
            yield $contentIds;
359
        } while (!empty($contentId));
360
    }
361
362
    /**
363
     * @param array $contentIds
364
     * @param bool $commit
365
     *
366
     * @return \Symfony\Component\Process\Process
367
     */
368
    private function getPhpProcess(array $contentIds, $commit)
369
    {
370
        $process = new ProcessBuilder([
371
            file_exists('bin/console') ? 'bin/console' : 'app/console',
372
            'ezplatform:reindex',
373
            '--content-ids=' . implode(',', $contentIds),
374
        ]);
375
        $process->setTimeout(null);
376
        $process->setPrefix($this->getPhpPath());
377
378
        if (!$commit) {
379
            $process->add('--no-commit');
380
        }
381
382
        return $process->getProcess();
383
    }
384
385
    /**
386
     * @return string
387
     */
388
    private function getPhpPath()
389
    {
390
        if ($this->phpPath) {
391
            return $this->phpPath;
392
        }
393
394
        $phpFinder = new PhpExecutableFinder();
395
        $this->phpPath = $phpFinder->find();
0 ignored issues
show
Documentation Bug introduced by
It seems like $phpFinder->find() can also be of type false. However, the property $phpPath is declared as type string. Maybe add an additional type check?

Our type inference engine has found a suspicous assignment of a value to a property. This check raises an issue when a value that can be of a mixed type is assigned to a property that is type hinted more strictly.

For example, imagine you have a variable $accountId that can either hold an Id object or false (if there is no account id yet). Your code now assigns that value to the id property of an instance of the Account class. This class holds a proper account, so the id value must no longer be false.

Either this assignment is in error or a type check should be added for that assignment.

class Id
{
    public $id;

    public function __construct($id)
    {
        $this->id = $id;
    }

}

class Account
{
    /** @var  Id $id */
    public $id;
}

$account_id = false;

if (starsAreRight()) {
    $account_id = new Id(42);
}

$account = new Account();
if ($account instanceof Id)
{
    $account->id = $account_id;
}
Loading history...
396
        if (!$this->phpPath) {
0 ignored issues
show
Bug Best Practice introduced by
The expression $this->phpPath of type string|false is loosely compared to false; this is ambiguous if the string can be empty. You might want to explicitly use === false instead.

In PHP, under loose comparison (like ==, or !=, or switch conditions), values of different types might be equal.

For string values, the empty string '' is a special case, in particular the following results might be unexpected:

''   == false // true
''   == null  // true
'ab' == false // false
'ab' == null  // false

// It is often better to use strict comparison
'' === false // false
'' === null  // false
Loading history...
397
            throw new \RuntimeException(
398
                'The php executable could not be found, it\'s needed for executing parable sub processes, so add it to your PATH environment variable and try again'
399
            );
400
        }
401
402
        return $this->phpPath;
403
    }
404
405
    /**
406
     * @return int
407
     */
408
    private function getNumberOfCPUCores()
409
    {
410
        $cores = 1;
411
        if (is_file('/proc/cpuinfo')) {
412
            // Linux (and potentially Windows with linux sub systems)
413
            $cpuinfo = file_get_contents('/proc/cpuinfo');
414
            preg_match_all('/^processor/m', $cpuinfo, $matches);
415
            $cores = count($matches[0]);
416
        } elseif (DIRECTORY_SEPARATOR === '\\') {
417
            // Windows
418
            if (($process = @popen('wmic cpu get NumberOfCores', 'rb')) !== false) {
419
                fgets($process);
420
                $cores = (int) fgets($process);
421
                pclose($process);
422
            }
423
        } elseif (($process = @popen('sysctl -a', 'rb')) !== false) {
424
            // *nix (Linux, BSD and Mac)
425
            $output = stream_get_contents($process);
426
            if (preg_match('/hw.ncpu: (\d+)/', $output, $matches)) {
427
                $cores = (int) $matches[1][0];
428
            }
429
            pclose($process);
430
        }
431
432
        return $cores;
433
    }
434
}
435