Passed
Push — master ( 366f40...c67819 )
by Christian
13:04 queued 10s
created

src/Core/Content/ImportExport/ImportExport.php (6 issues)

1
<?php declare(strict_types=1);
2
3
namespace Shopware\Core\Content\ImportExport;
4
5
use League\Flysystem\FilesystemInterface;
6
use Shopware\Core\Content\ImportExport\Aggregate\ImportExportLog\ImportExportLogEntity;
7
use Shopware\Core\Content\ImportExport\Event\EnrichExportCriteriaEvent;
8
use Shopware\Core\Content\ImportExport\Event\ImportExportAfterImportRecordEvent;
9
use Shopware\Core\Content\ImportExport\Event\ImportExportBeforeExportRecordEvent;
10
use Shopware\Core\Content\ImportExport\Event\ImportExportBeforeImportRecordEvent;
11
use Shopware\Core\Content\ImportExport\Event\ImportExportExceptionImportRecordEvent;
12
use Shopware\Core\Content\ImportExport\Exception\ProcessingException;
13
use Shopware\Core\Content\ImportExport\Processing\Mapping\CriteriaBuilder;
14
use Shopware\Core\Content\ImportExport\Processing\Pipe\AbstractPipe;
15
use Shopware\Core\Content\ImportExport\Processing\Reader\AbstractReader;
16
use Shopware\Core\Content\ImportExport\Processing\Writer\AbstractWriter;
17
use Shopware\Core\Content\ImportExport\Service\ImportExportService;
18
use Shopware\Core\Content\ImportExport\Struct\Config;
19
use Shopware\Core\Content\ImportExport\Struct\Progress;
20
use Shopware\Core\Framework\Context;
21
use Shopware\Core\Framework\DataAbstractionLayer\Entity;
22
use Shopware\Core\Framework\DataAbstractionLayer\EntityRepositoryInterface;
23
use Shopware\Core\Framework\DataAbstractionLayer\Field\IdField;
24
use Shopware\Core\Framework\DataAbstractionLayer\Search\Criteria;
25
use Shopware\Core\Framework\DataAbstractionLayer\Search\Sorting\FieldSorting;
26
use Shopware\Core\Framework\Uuid\Uuid;
27
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;
28
29
class ImportExport
30
{
31
    private const PART_FILE_SUFFIX = '.offset_';
32
33
    /**
34
     * @var EntityRepositoryInterface
35
     */
36
    private $repository;
37
38
    /**
39
     * @var AbstractPipe
40
     */
41
    private $pipe;
42
43
    /**
44
     * @var AbstractReader
45
     */
46
    private $reader;
47
48
    /**
49
     * @var AbstractWriter
50
     */
51
    private $writer;
52
53
    /**
54
     * @var ImportExportLogEntity
55
     */
56
    private $logEntity;
57
58
    /**
59
     * @var FilesystemInterface
60
     */
61
    private $filesystem;
62
63
    /**
64
     * @var int
65
     */
66
    private $importLimit;
67
68
    /**
69
     * @var int
70
     */
71
    private $exportLimit;
72
73
    /**
74
     * @var int|null
75
     */
76
    private $total;
77
78
    /**
79
     * @var ImportExportService
80
     */
81
    private $importExportService;
82
83
    /**
84
     * @var EventDispatcherInterface
85
     */
86
    private $eventDispatcher;
87
88
    public function __construct(
89
        ImportExportService $importExportService,
90
        ImportExportLogEntity $logEntity,
91
        FilesystemInterface $filesystem,
92
        EventDispatcherInterface $eventDispatcher,
93
        EntityRepositoryInterface $repository,
94
        AbstractPipe $pipe,
95
        AbstractReader $reader,
96
        AbstractWriter $writer,
97
        int $importLimit = 250,
98
        int $exportLimit = 250
99
    ) {
100
        $this->logEntity = $logEntity;
101
        $this->filesystem = $filesystem;
102
        $this->repository = $repository;
103
        $this->writer = $writer;
104
        $this->pipe = $pipe;
105
        $this->reader = $reader;
106
        $this->importExportService = $importExportService;
107
        $this->eventDispatcher = $eventDispatcher;
108
        $this->importLimit = $importLimit;
109
        $this->exportLimit = $exportLimit;
110
    }
111
112
    public function import(Context $context, int $offset = 0): Progress
113
    {
114
        $progress = $this->importExportService->getProgress($this->logEntity->getId(), $offset);
115
        $progress->setTotal($this->logEntity->getFile()->getSize());
116
117
        if ($progress->isFinished()) {
118
            return $progress;
119
        }
120
121
        $processed = 0;
122
123
        $path = $this->logEntity->getFile()->getPath();
124
        $progress->setTotal($this->filesystem->getSize($path));
0 ignored issues
show
It seems like $this->filesystem->getSize($path) can also be of type false; however, parameter $total of Shopware\Core\Content\Im...ct\Progress::setTotal() does only seem to accept integer|null, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

124
        $progress->setTotal(/** @scrutinizer ignore-type */ $this->filesystem->getSize($path));
Loading history...
125
        $invalidRecordsProgress = null;
126
127
        $failedRecords = [];
128
129
        $resource = $this->filesystem->readStream($path);
130
        $config = Config::fromLog($this->logEntity);
131
132
        foreach ($this->reader->read($config, $resource, $offset) as $row) {
0 ignored issues
show
It seems like $resource can also be of type false; however, parameter $resource of Shopware\Core\Content\Im...\AbstractReader::read() does only seem to accept resource, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

132
        foreach ($this->reader->read($config, /** @scrutinizer ignore-type */ $resource, $offset) as $row) {
Loading history...
133
            $record = [];
134
135
            foreach ($this->pipe->out($config, $row) as $key => $value) {
136
                $record[$key] = $value;
137
            }
138
139
            if (empty($record)) {
140
                continue;
141
            }
142
143
            try {
144
                $record = $this->ensurePrimaryKeys($record);
145
146
                $event = new ImportExportBeforeImportRecordEvent($record, $row, $config, $context);
147
                $this->eventDispatcher->dispatch($event);
148
149
                $record = $event->getRecord();
150
151
                $result = $this->repository->upsert([$record], $context);
152
                $progress->addProcessedRecords(1);
153
154
                $afterRecord = new ImportExportAfterImportRecordEvent($result, $record, $row, $config, $context);
155
                $this->eventDispatcher->dispatch($afterRecord);
156
            } catch (\Throwable $exception) {
157
                $event = new ImportExportExceptionImportRecordEvent($exception, $record, $row, $config, $context);
158
                $this->eventDispatcher->dispatch($event);
159
160
                $exception = $event->getException();
161
162
                if ($exception) {
163
                    $record['_error'] = mb_convert_encoding($exception->getMessage(), 'UTF-8', 'UTF-8');
164
                    $failedRecords[] = $record;
165
                }
166
            }
167
            $this->importExportService->saveProgress($progress);
168
169
            if ($this->importLimit > 0 && ++$processed >= $this->importLimit) {
170
                break;
171
            }
172
        }
173
        $progress->setOffset($this->reader->getOffset());
174
175
        if (!empty($failedRecords)) {
176
            $invalidRecordsProgress = $this->exportInvalid($context, $failedRecords);
177
            $progress->setInvalidRecordsLogId($invalidRecordsProgress->getLogId());
178
        }
179
180
        // importing the file is complete
181
        if ($this->reader->getOffset() === $this->filesystem->getSize($path)) {
182
            if ($this->logEntity->getInvalidRecordsLog() !== null) {
183
                $invalidLog = $this->logEntity->getInvalidRecordsLog();
184
                $invalidRecordsProgress = $invalidRecordsProgress
185
                    ?? $this->importExportService->getProgress($invalidLog->getId(), $invalidLog->getRecords());
186
187
                // complete invalid records export
188
                $this->mergePartFiles($this->logEntity->getInvalidRecordsLog(), $invalidRecordsProgress);
189
190
                $invalidRecordsProgress->setState(Progress::STATE_SUCCEEDED);
191
                $this->importExportService->saveProgress($invalidRecordsProgress);
192
            }
193
194
            $progress->setState($invalidRecordsProgress === null ? Progress::STATE_SUCCEEDED : Progress::STATE_FAILED);
195
        }
196
        $this->importExportService->saveProgress($progress);
197
198
        return $progress;
199
    }
200
201
    public function export(Context $context, ?Criteria $criteria = null, int $offset = 0): Progress
202
    {
203
        $progress = $this->importExportService->getProgress($this->logEntity->getId(), $offset);
204
205
        if ($progress->isFinished()) {
206
            return $progress;
207
        }
208
209
        $config = Config::fromLog($this->logEntity);
210
        $criteriaBuilder = new CriteriaBuilder($this->repository->getDefinition());
211
212
        $criteria = $criteria === null ? new Criteria() : clone $criteria;
213
        $criteriaBuilder->enrichCriteria($config, $criteria);
214
215
        $enrichEvent = new EnrichExportCriteriaEvent($criteria, $this->logEntity);
216
        $this->eventDispatcher->dispatch($enrichEvent);
217
218
        if ($criteria->getSorting() === []) {
219
            // default sorting
220
            $criteria->addSorting(new FieldSorting('createdAt', FieldSorting::ASCENDING));
221
        }
222
223
        $criteria->setOffset($offset);
224
        $criteria->setTotalCountMode(Criteria::TOTAL_COUNT_MODE_EXACT);
225
226
        $criteria->setLimit($this->exportLimit <= 0 ? 250 : $this->exportLimit);
227
        $fullExport = $this->exportLimit === null || $this->exportLimit <= 0;
228
229
        $targetFile = $this->getPartFilePath($this->logEntity->getFile()->getPath(), $offset);
230
231
        do {
232
            $result = $this->repository->search($criteria, $context);
233
            if ($this->total === null) {
234
                $this->total = $result->getTotal();
235
                $criteria->setTotalCountMode(Criteria::TOTAL_COUNT_MODE_NONE);
236
            }
237
238
            $entities = $result->getEntities();
239
            if (\count($entities) === 0) {
240
                // this can happen if entities are delete while we export
241
                $progress->setTotal($progress->getOffset());
242
243
                break;
244
            }
245
246
            $progress = $this->exportChunk($config, $entities, $progress, $targetFile);
247
248
            $criteria->setOffset($criteria->getOffset() + $criteria->getLimit());
249
        } while ($fullExport && $progress->getOffset() < $progress->getTotal());
250
251
        if ($progress->getTotal() > $progress->getOffset()) {
252
            return $progress;
253
        }
254
255
        $this->writer->finish($config, $targetFile);
256
257
        return $this->mergePartFiles($this->logEntity, $progress);
258
    }
259
260
    public function getLogEntity(): ImportExportLogEntity
261
    {
262
        return $this->logEntity;
263
    }
264
265
    private function getPartFilePath(string $targetPath, int $offset): string
266
    {
267
        return $targetPath . self::PART_FILE_SUFFIX . $offset;
268
    }
269
270
    /**
271
     * flysystem does not support appending to existing files. Therefore we need to export multiple files and merge them
272
     * into the complete export file at the end.
273
     */
274
    private function mergePartFiles(ImportExportLogEntity $logEntity, Progress $progress): Progress
275
    {
276
        $progress->setState(Progress::STATE_MERGING_FILES);
277
        $this->importExportService->saveProgress($progress);
278
279
        $tmpFile = tempnam(sys_get_temp_dir(), '');
280
        $tmp = fopen($tmpFile, 'w+b');
281
282
        $target = $logEntity->getFile()->getPath();
283
284
        $dir = \dirname($target);
285
286
        $partFilePrefix = $target . self::PART_FILE_SUFFIX;
287
288
        $partFiles = [];
289
290
        foreach ($this->filesystem->listContents($dir) as $meta) {
291
            if ($meta['type'] !== 'file'
292
                || $meta['path'] === $target
293
                || strpos($meta['path'], $partFilePrefix) !== 0) {
294
                continue;
295
            }
296
297
            $partFiles[] = $meta['path'];
298
        }
299
300
        // sort by offset
301
        natsort($partFiles);
302
303
        // concatenate all part files into a temporary file
304
        foreach ($partFiles as $partFile) {
305
            if (stream_copy_to_stream($this->filesystem->readStream($partFile), $tmp) === false) {
0 ignored issues
show
It seems like $this->filesystem->readStream($partFile) can also be of type false; however, parameter $from of stream_copy_to_stream() does only seem to accept resource, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

305
            if (stream_copy_to_stream(/** @scrutinizer ignore-type */ $this->filesystem->readStream($partFile), $tmp) === false) {
Loading history...
306
                throw new ProcessingException('Failed to merge files');
307
            }
308
        }
309
310
        // copy final file into filesystem
311
        $this->filesystem->putStream($target, $tmp);
312
313
        if (\is_resource($tmp)) {
314
            fclose($tmp);
315
        }
316
        unlink($tmpFile);
317
318
        foreach ($partFiles as $p) {
319
            $this->filesystem->delete($p);
320
        }
321
322
        $progress->setState(Progress::STATE_SUCCEEDED);
323
        $this->importExportService->saveProgress($progress);
324
325
        $this->importExportService->updateFile(
326
            Context::createDefaultContext(),
327
            $logEntity->getFileId(),
0 ignored issues
show
It seems like $logEntity->getFileId() can also be of type null; however, parameter $fileId of Shopware\Core\Content\Im...rtService::updateFile() does only seem to accept string, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

327
            /** @scrutinizer ignore-type */ $logEntity->getFileId(),
Loading history...
328
            ['size' => $this->filesystem->getSize($target)]
329
        );
330
331
        return $progress;
332
    }
333
334
    private function exportChunk(Config $config, iterable $records, Progress $progress, string $targetFile): Progress
335
    {
336
        $exportedRecords = 0;
337
        $offset = $progress->getOffset();
338
        /** @var Entity|array $originalRecord */
339
        foreach ($records as $originalRecord) {
340
            $originalRecord = $originalRecord instanceof Entity
341
                ? $originalRecord->jsonSerialize()
342
                : $originalRecord;
343
344
            $record = [];
345
            foreach ($this->pipe->in($config, $originalRecord) as $key => $value) {
346
                $record[$key] = $value;
347
            }
348
349
            if ($record !== []) {
350
                $event = new ImportExportBeforeExportRecordEvent($config, $record, $originalRecord);
351
                $this->eventDispatcher->dispatch($event);
352
353
                $record = $event->getRecord();
354
355
                $this->writer->append($config, $record, $offset);
356
                ++$exportedRecords;
357
            }
358
359
            ++$offset;
360
        }
361
362
        $this->writer->flush($config, $targetFile);
363
364
        $progress->setState(Progress::STATE_PROGRESS);
365
        $progress->setOffset($offset);
366
        $progress->setTotal($this->total);
367
        $progress->addProcessedRecords($exportedRecords);
368
369
        $this->importExportService->saveProgress($progress);
370
371
        return $progress;
372
    }
373
374
    /**
375
     * In case we failed to import some invalid records, we export them as a new csv with the same format and
376
     * an additional _error column.
377
     */
378
    private function exportInvalid(Context $context, array $failedRecords): Progress
379
    {
380
        // created a invalid records export if it doesn't exist
381
        if (!$this->logEntity->getInvalidRecordsLogId()) {
382
            $pathInfo = pathinfo($this->logEntity->getFile()->getOriginalName());
383
            $newName = $pathInfo['filename'] . '_failed.' . $pathInfo['extension'];
384
385
            $newPath = $this->logEntity->getFile()->getPath() . '_invalid';
386
387
            $config = $this->logEntity->getConfig();
388
            $config['mapping'][] = [
389
                'key' => '_error',
390
                'mappedKey' => '_error',
391
            ];
392
            $config = new Config($config['mapping'], $config['parameters'] ?? []);
393
394
            $failedImportLogEntity = $this->importExportService->prepareExport(
395
                $context,
396
                $this->logEntity->getProfileId(),
0 ignored issues
show
It seems like $this->logEntity->getProfileId() can also be of type null; however, parameter $profileId of Shopware\Core\Content\Im...ervice::prepareExport() does only seem to accept string, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

396
                /** @scrutinizer ignore-type */ $this->logEntity->getProfileId(),
Loading history...
397
                $this->logEntity->getFile()->getExpireDate(),
398
                $newName,
399
                $config->jsonSerialize(),
400
                $newPath,
401
                ImportExportLogEntity::ACTIVITY_INVALID_RECORDS_EXPORT
402
            );
403
404
            $this->logEntity->setInvalidRecordsLog($failedImportLogEntity);
405
            $this->logEntity->setInvalidRecordsLogId($failedImportLogEntity->getId());
406
        }
407
408
        $failedImportLogEntity = $this->logEntity->getInvalidRecordsLog();
409
        $config = Config::fromLog($failedImportLogEntity);
0 ignored issues
show
It seems like $failedImportLogEntity can also be of type null; however, parameter $log of Shopware\Core\Content\Im...truct\Config::fromLog() does only seem to accept Shopware\Core\Content\Im...g\ImportExportLogEntity, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

409
        $config = Config::fromLog(/** @scrutinizer ignore-type */ $failedImportLogEntity);
Loading history...
410
411
        $offset = $failedImportLogEntity->getRecords();
412
413
        $targetFile = $this->getPartFilePath($failedImportLogEntity->getFile()->getPath(), $offset);
414
415
        $progress = $this->importExportService->getProgress($failedImportLogEntity->getId(), $offset);
416
417
        $progress = $this->exportChunk(
418
            $config,
419
            $failedRecords,
420
            $progress,
421
            $targetFile
422
        );
423
424
        return $progress;
425
    }
426
427
    private function ensurePrimaryKeys(array $data): array
428
    {
429
        foreach ($this->repository->getDefinition()->getPrimaryKeys() as $primaryKey) {
430
            if (!($primaryKey instanceof IdField)) {
431
                continue;
432
            }
433
434
            if (!isset($data[$primaryKey->getPropertyName()])) {
435
                $data[$primaryKey->getPropertyName()] = Uuid::randomHex();
436
            }
437
        }
438
439
        return $data;
440
    }
441
}
442