ElasticsearchMigration::getMigrationStatus()   A
last analyzed

Complexity

Conditions 3
Paths 2

Size

Total Lines 32
Code Lines 21

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 21
CRAP Score 3

Importance

Changes 0
Metric Value
eloc 21
dl 0
loc 32
ccs 21
cts 21
cp 1
rs 9.584
c 0
b 0
f 0
cc 3
nc 2
nop 1
crap 3
1
<?php
2
namespace Triadev\EsMigration;
3
4
use Illuminate\Support\Carbon;
5
use Triadev\EsMigration\Business\Repository\ElasticsearchClients;
6
use Triadev\EsMigration\Business\Service\MigrationSteps;
7
use Triadev\EsMigration\Contract\ElasticsearchMigrationContract;
8
use Triadev\EsMigration\Exception\MigrationAlreadyDone;
9
use Triadev\EsMigration\Business\Mapper\MigrationTypes;
10
use Triadev\EsMigration\Business\Mapper\MigrationStatus;
11
use Triadev\EsMigration\Exception\MigrationAlreadyRunning;
12
use Triadev\EsMigration\Exception\MigrationStepNotFound;
13
use Triadev\EsMigration\Models\Entity\ElasticsearchMigration as ElasticsearchMigrationEntity;
14
use Triadev\EsMigration\Models\Entity\ElasticsearchMigrationStep;
15
use Triadev\EsMigration\Contract\Repository\ElasticsearchMigrationContract as EsMigrationRepositoryInterface;
16
use Triadev\EsMigration\Contract\Repository\ElasticsearchMigrationStepContract as EsMigrationStepRepositoryInterface;
17
use Triadev\EsMigration\Models\Migration;
18
use Triadev\EsMigration\Models\MigrationStep;
19
20
class ElasticsearchMigration implements ElasticsearchMigrationContract
21
{
22
    /** @var EsMigrationRepositoryInterface */
23
    private $migrationRepository;
24
    
25
    /** @var EsMigrationStepRepositoryInterface */
26
    private $migrationStepRepository;
27
    
28
    /** @var MigrationSteps */
29
    private $migrationStepService;
30
    
31
    /**
32
     * ElasticsearchMigration constructor.
33
     */
34 63
    public function __construct()
35
    {
36 63
        $this->migrationRepository = app(EsMigrationRepositoryInterface::class);
37 63
        $this->migrationStepRepository = app(EsMigrationStepRepositoryInterface::class);
38 63
        $this->migrationStepService = app(MigrationSteps::class);
39 63
    }
40
    
41
    /**
42
     * @inheritdoc
43
     */
44 17
    public function createMigration(string $migration): bool
45
    {
46
        try {
47 17
            $this->migrationRepository->createOrUpdate($migration);
48
        } catch (\Throwable $e) {
49
            return false;
50
        }
51
        
52 17
        return true;
53
    }
54
    
55
    /**
56
     * @inheritdoc
57
     */
58 1
    public function deleteMigration(string $migration) : bool
59
    {
60
        try {
61 1
            $this->migrationRepository->delete($migration);
62
        } catch (\Throwable $e) {
63
            return false;
64
        }
65
        
66 1
        return true;
67
    }
68
    
69
    /**
70
     * @inheritdoc
71
     */
72 16
    public function addMigrationStep(
73
        string $migration,
74
        string $type,
75
        array $params = [],
76
        int $priority = 1,
77
        bool $stopOnFailure = true
78
    ) : bool {
79 16
        if (!(new MigrationTypes())->isMigrationTypeValid($type)) {
80 1
            return false;
81
        }
82
        
83
        try {
84 15
            if ($migration = $this->migrationRepository->find($migration)) {
85 14
                $this->migrationStepRepository->create(
86 14
                    $migration->id,
87 14
                    $type,
88 14
                    $params,
89 14
                    $priority,
90 14
                    $stopOnFailure
91
                );
92
                
93 15
                return true;
94
            }
95
        } catch (\Throwable $e) {
96
            return false;
97
        }
98
        
99 1
        return false;
100
    }
101
    
102
    /**
103
     * @inheritdoc
104
     */
105 1
    public function deleteMigrationStep(int $migrationStepId) : bool
106
    {
107
        try {
108 1
            $this->migrationStepRepository->delete($migrationStepId);
109
        } catch (\Throwable $e) {
110
            return false;
111
        }
112
        
113 1
        return true;
114
    }
115
    
116
    /**
117
     * @inheritdoc
118
     */
119 1
    public function startSingleMigrationStep(int $migrationStepId, ElasticsearchClients $elasticsearchClients)
120
    {
121 1
        if ($migrationStep = $this->migrationStepService->getMigrationStep($migrationStepId)) {
122
            $this->startMigrationStep($migrationStep, $elasticsearchClients);
123
        } else {
124 1
            throw new MigrationStepNotFound();
125
        }
126
    }
127
    
128
    /**
129
     * @inheritdoc
130
     */
131 4
    public function getMigrationStatus(string $migration) : array
132
    {
133 4
        $migrationSteps = [];
134
        
135 4
        $status = null;
136 4
        $error = null;
137
    
138 4
        if ($migrationEntity = $this->migrationRepository->find($migration)) {
139 4
            $status = $migrationEntity->status;
140 4
            $error = $migrationEntity->error;
141
            
142 4
            foreach ($migrationEntity->migrationSteps()->cursor() as $migrationStep) {
143
                /** @var ElasticsearchMigrationStep $migrationStep */
144 4
                $migrationStepData = array_except($migrationStep->toArray(), ['migration_id']);
145
                
146 4
                $migrationStepData['id'] = (int) $migrationStepData['id'];
147 4
                $migrationStepData['status'] = (int) $migrationStepData['status'];
148 4
                $migrationStepData['params'] = json_decode($migrationStepData['params'], true);
149 4
                $migrationStepData['priority'] = (int) $migrationStepData['priority'];
150 4
                $migrationStepData['stop_on_failure'] = (bool) $migrationStepData['stop_on_failure'];
151 4
                $migrationStepData['created_at'] = new \DateTime($migrationStepData['created_at']);
152 4
                $migrationStepData['updated_at'] = new \DateTime($migrationStepData['updated_at']);
153
                
154 4
                $migrationSteps[] = $migrationStepData;
155
            }
156
        }
157
    
158
        return [
159 4
            'migration' => $migration,
160 4
            'status' => (int) $status,
161 4
            'error' => $error,
162 4
            'steps' => $migrationSteps
163
        ];
164
    }
165
    
166
    /**
167
     * @inheritdoc
168
     */
169 6
    public function startMigration(string $migration, ElasticsearchClients $elasticsearchClients)
170
    {
171 6
        $this->checkIfMigrationAlreadyDone($migration);
172 6
        $this->checkIfMigrationAlreadyRunning($migration);
173
    
174 5
        $this->migrate($migration, $elasticsearchClients);
175 5
    }
176
    
177
    /**
178
     * @inheritdoc
179
     */
180 2
    public function restartMigration(string $migration, ElasticsearchClients $elasticsearchClients)
181
    {
182 2
        $this->checkIfMigrationAlreadyRunning($migration);
183
        
184 1
        $this->migrate($migration, $elasticsearchClients);
185 1
    }
186
    
187
    /**
188
     * @param string $migration
189
     * @param ElasticsearchClients $elasticsearchClients
190
     * @throws \Throwable
191
     */
192 6
    private function migrate(string $migration, ElasticsearchClients $elasticsearchClients)
193
    {
194
        try {
195 6
            $migrationSteps = $this->migrationStepService->getMigrationSteps($migration, true);
196
        
197 6
            if (!empty($migrationSteps)) {
198 6
                $this->migrationRepository->createOrUpdate($migration, MigrationStatus::MIGRATION_STATUS_RUNNING);
199
            
200 6
                foreach ($migrationSteps as $migrationStep) {
201 6
                    $this->startMigrationStep($migrationStep, $elasticsearchClients);
202
                }
203
            
204 4
                $this->migrationRepository->createOrUpdate($migration, MigrationStatus::MIGRATION_STATUS_DONE);
205
            }
206 2
        } catch (\Exception $e) {
207 2
            $this->migrationRepository->createOrUpdate(
208 2
                $migration,
209 2
                MigrationStatus::MIGRATION_STATUS_ERROR,
210 2
                $e->getMessage()
211
            );
212
        }
213 6
    }
214
    
215
    /**
216
     * @param string $migration
217
     * @throws MigrationAlreadyDone
218
     */
219 6
    private function checkIfMigrationAlreadyDone(string $migration)
220
    {
221 6
        $migrationEntity = $this->migrationRepository->find($migration);
222
        
223 6
        if ($migrationEntity instanceof ElasticsearchMigrationEntity &&
224 6
            $migrationEntity->status == MigrationStatus::MIGRATION_STATUS_DONE) {
225 1
            throw new MigrationAlreadyDone();
226
        }
227 6
    }
228
    
229
    /**
230
     * @param string $migration
231
     * @throws MigrationAlreadyRunning
232
     */
233 8
    private function checkIfMigrationAlreadyRunning(string $migration)
234
    {
235 8
        $migrationEntity = $this->migrationRepository->find($migration);
236
        
237 8
        if ($migrationEntity instanceof ElasticsearchMigrationEntity &&
238 8
            $migrationEntity->status == MigrationStatus::MIGRATION_STATUS_RUNNING) {
239 2
            throw new MigrationAlreadyRunning();
240
        }
241 6
    }
242
    
243
    /**
244
     * @param MigrationStep $migrationStep
245
     * @param ElasticsearchClients $elasticsearchClients
246
     * @throws Exception\MigrationsNotExist
247
     * @throws \Throwable
248
     */
249 6
    private function startMigrationStep(
250
        MigrationStep $migrationStep,
251
        ElasticsearchClients $elasticsearchClients
252
    ) {
253
        try {
254 6
            $this->migrationStepRepository->update($migrationStep->getId(), MigrationStatus::MIGRATION_STATUS_RUNNING);
255
            
256 6
            foreach ($elasticsearchClients->all() as $elasticsearchClient) {
257 6
                if ($migrationClass = (new MigrationTypes())->mapTypeToClass($migrationStep->getType())) {
258 6
                    $migrationClass->migrate($elasticsearchClient, $migrationStep);
259
                }
260
            }
261
    
262 4
            $this->migrationStepRepository->update($migrationStep->getId(), MigrationStatus::MIGRATION_STATUS_DONE);
263 3
        } catch (\Throwable $e) {
264 3
            $this->migrationStepRepository->update(
265 3
                $migrationStep->getId(),
266 3
                MigrationStatus::MIGRATION_STATUS_ERROR,
267 3
                $e->getMessage()
268
            );
269
            
270 3
            if ($migrationStep->isStopOnFailure()) {
271 2
                throw $e;
272
            }
273
        }
274 4
    }
275
    
276
    /**
277
     * @inheritdoc
278
     */
279 1
    public function getAllMigrations(?array $onlyWithStatus = null): array
280
    {
281 1
        $migrations = [];
282
        
283
        $this->migrationRepository->all()->each(function ($migrationEntity) use (&$migrations, $onlyWithStatus) {
284
            /** @var \Triadev\EsMigration\Models\Entity\ElasticsearchMigration $migrationEntity */
285 1
            if (is_array($onlyWithStatus) && !in_array($migrationEntity->status, $onlyWithStatus)) {
286 1
                return;
287
            }
288
            
289 1
            $migrations[] = new Migration(
290 1
                $migrationEntity->id,
291 1
                $migrationEntity->migration,
292 1
                $migrationEntity->status,
293 1
                $migrationEntity->error,
294 1
                Carbon::parse($migrationEntity->created_at),
295 1
                Carbon::parse($migrationEntity->updated_at)
296
            );
297 1
        });
298
        
299 1
        return $migrations;
300
    }
301
}
302