Issues (16)

src/Pipelines/Stages/ReIndexStage.php (2 issues)

1
<?php
2
3
namespace Nord\Lumen\Elasticsearch\Pipelines\Stages;
4
5
use Elasticsearch\Common\Exceptions\ServerErrorResponseException;
6
use League\Pipeline\StageInterface;
7
use Nord\Lumen\Elasticsearch\Contracts\ElasticsearchServiceContract;
8
use Nord\Lumen\Elasticsearch\Pipelines\Payloads\ApplyMigrationPayload;
9
use Symfony\Component\Console\Helper\ProgressBar;
10
use Symfony\Component\Console\Output\ConsoleOutput;
11
12
/**
13
 * Class ReIndexStage
14
 * @package Nord\Lumen\Elasticsearch\Pipelines\Stages
15
 */
16
class ReIndexStage implements StageInterface
17
{
18
19
    /**
20
     * @var ElasticsearchServiceContract
21
     */
22
    private $elasticsearchService;
23
24
    /**
25
     * CheckIndexExistsStage constructor.
26
     *
27
     * @param ElasticsearchServiceContract $elasticsearchService
28
     */
29
    public function __construct(ElasticsearchServiceContract $elasticsearchService)
30
    {
31
        $this->elasticsearchService = $elasticsearchService;
32
    }
33
34
    /**
35
     * @inheritdoc
36
     */
37
    public function __invoke($payload)
38
    {
39
        /** @var ApplyMigrationPayload $payload */
40
        // Reindex data from the old index to the new, but only if the old index exists (not true on brand new setups)
41
        $oldIndex = $payload->getPrefixedIndexName();
42
        $newIndex = $payload->getPrefixedTargetVersionName();
43
44
        if ($this->elasticsearchService->indices()->exists(['index' => $oldIndex])) {
45
            // Temporarily change some index settings to speed up the process
46
            $this->elasticsearchService->indices()->putSettings([
47
                'index' => $newIndex,
48
                'body'  => [
49
                    'refresh_interval'   => -1,
50
                    'number_of_replicas' => 0,
51
                ],
52
            ]);
53
54
            $task = $this->elasticsearchService->reindex([
55
                'wait_for_completion' => false,
56
                'body'                => [
57
                    'source' => [
58
                        'index' => $oldIndex,
59
                        'size'  => $payload->getBatchSize(),
60
                    ],
61
                    'dest'   => [
62
                        'index' => $newIndex,
63
                    ],
64
                ],
65
            ]);
66
67
            // Use a progress bar to indicate how far a long the re-indexing has come
68
            $this->renderProgressBar($task);
69
        }
70
71
        return $payload;
72
    }
73
74
    /**
75
     * Renders a progress bar until the specified re-indexing task is completed
76
     *
77
     * @param array $task
78
     */
79
    protected function renderProgressBar(array $task): void
80
    {
81
        // Use a progress bar to indicate how far a long the re-indexing has come
82
        $progressBar = null;
83
84
        do {
85
            $response = ['completed' => false];
0 ignored issues
show
The assignment to $response is dead and can be removed.
Loading history...
86
87
            // Ignore ServerErrorResponseException, re-indexing can make these requests time out
88
            try {
89
                $response = $this->elasticsearchService->tasks()->get([
90
                    'task_id' => $task['task'],
91
                ]);
92
93
                $total = $response['task']['status']['total'];
94
95
                // Initialize the progress bar once Elasticsearch knows the total amount of items
96
                if ($progressBar === null && $total > 0) {
97
                    $progressBar = new ProgressBar(new ConsoleOutput(), $total);
98
                } elseif ($progressBar !== null) {
99
                    /** @var ProgressBar $progressBar */
100
                    $progressBar->setProgress($response['task']['status']['created']);
101
                }
102
            } catch (ServerErrorResponseException $e) {
0 ignored issues
show
Coding Style Comprehensibility introduced by
Consider adding a comment why this CATCH block is empty.
Loading history...
103
            }
104
105
            sleep(1);
106
        } while ((bool)$response['completed'] === false);
107
108
        // For very short migrations we may never get a progress bar, because the task finishes too quickly
109
        if ($progressBar !== null) {
110
            $progressBar->finish();
111
        }
112
113
        echo PHP_EOL;
114
    }
115
}
116