1 | <?php declare(strict_types=1); |
||||
2 | |||||
3 | namespace Shopware\Core\Content\ImportExport; |
||||
4 | |||||
5 | use League\Flysystem\FilesystemInterface; |
||||
6 | use Shopware\Core\Content\ImportExport\Aggregate\ImportExportLog\ImportExportLogEntity; |
||||
7 | use Shopware\Core\Content\ImportExport\Event\EnrichExportCriteriaEvent; |
||||
8 | use Shopware\Core\Content\ImportExport\Event\ImportExportAfterImportRecordEvent; |
||||
9 | use Shopware\Core\Content\ImportExport\Event\ImportExportBeforeExportRecordEvent; |
||||
10 | use Shopware\Core\Content\ImportExport\Event\ImportExportBeforeImportRecordEvent; |
||||
11 | use Shopware\Core\Content\ImportExport\Event\ImportExportExceptionImportRecordEvent; |
||||
12 | use Shopware\Core\Content\ImportExport\Exception\ProcessingException; |
||||
13 | use Shopware\Core\Content\ImportExport\Processing\Mapping\CriteriaBuilder; |
||||
14 | use Shopware\Core\Content\ImportExport\Processing\Pipe\AbstractPipe; |
||||
15 | use Shopware\Core\Content\ImportExport\Processing\Reader\AbstractReader; |
||||
16 | use Shopware\Core\Content\ImportExport\Processing\Writer\AbstractWriter; |
||||
17 | use Shopware\Core\Content\ImportExport\Service\ImportExportService; |
||||
18 | use Shopware\Core\Content\ImportExport\Struct\Config; |
||||
19 | use Shopware\Core\Content\ImportExport\Struct\Progress; |
||||
20 | use Shopware\Core\Framework\Context; |
||||
21 | use Shopware\Core\Framework\DataAbstractionLayer\Entity; |
||||
22 | use Shopware\Core\Framework\DataAbstractionLayer\EntityRepositoryInterface; |
||||
23 | use Shopware\Core\Framework\DataAbstractionLayer\Field\IdField; |
||||
24 | use Shopware\Core\Framework\DataAbstractionLayer\Search\Criteria; |
||||
25 | use Shopware\Core\Framework\DataAbstractionLayer\Search\Sorting\FieldSorting; |
||||
26 | use Shopware\Core\Framework\Uuid\Uuid; |
||||
27 | use Symfony\Contracts\EventDispatcher\EventDispatcherInterface; |
||||
28 | |||||
29 | class ImportExport |
||||
30 | { |
||||
31 | private const PART_FILE_SUFFIX = '.offset_'; |
||||
32 | |||||
33 | /** |
||||
34 | * @var EntityRepositoryInterface |
||||
35 | */ |
||||
36 | private $repository; |
||||
37 | |||||
38 | /** |
||||
39 | * @var AbstractPipe |
||||
40 | */ |
||||
41 | private $pipe; |
||||
42 | |||||
43 | /** |
||||
44 | * @var AbstractReader |
||||
45 | */ |
||||
46 | private $reader; |
||||
47 | |||||
48 | /** |
||||
49 | * @var AbstractWriter |
||||
50 | */ |
||||
51 | private $writer; |
||||
52 | |||||
53 | /** |
||||
54 | * @var ImportExportLogEntity |
||||
55 | */ |
||||
56 | private $logEntity; |
||||
57 | |||||
58 | /** |
||||
59 | * @var FilesystemInterface |
||||
60 | */ |
||||
61 | private $filesystem; |
||||
62 | |||||
63 | /** |
||||
64 | * @var int |
||||
65 | */ |
||||
66 | private $importLimit; |
||||
67 | |||||
68 | /** |
||||
69 | * @var int |
||||
70 | */ |
||||
71 | private $exportLimit; |
||||
72 | |||||
73 | /** |
||||
74 | * @var int|null |
||||
75 | */ |
||||
76 | private $total; |
||||
77 | |||||
78 | /** |
||||
79 | * @var ImportExportService |
||||
80 | */ |
||||
81 | private $importExportService; |
||||
82 | |||||
83 | /** |
||||
84 | * @var EventDispatcherInterface |
||||
85 | */ |
||||
86 | private $eventDispatcher; |
||||
87 | |||||
88 | public function __construct( |
||||
89 | ImportExportService $importExportService, |
||||
90 | ImportExportLogEntity $logEntity, |
||||
91 | FilesystemInterface $filesystem, |
||||
92 | EventDispatcherInterface $eventDispatcher, |
||||
93 | EntityRepositoryInterface $repository, |
||||
94 | AbstractPipe $pipe, |
||||
95 | AbstractReader $reader, |
||||
96 | AbstractWriter $writer, |
||||
97 | int $importLimit = 250, |
||||
98 | int $exportLimit = 250 |
||||
99 | ) { |
||||
100 | $this->logEntity = $logEntity; |
||||
101 | $this->filesystem = $filesystem; |
||||
102 | $this->repository = $repository; |
||||
103 | $this->writer = $writer; |
||||
104 | $this->pipe = $pipe; |
||||
105 | $this->reader = $reader; |
||||
106 | $this->importExportService = $importExportService; |
||||
107 | $this->eventDispatcher = $eventDispatcher; |
||||
108 | $this->importLimit = $importLimit; |
||||
109 | $this->exportLimit = $exportLimit; |
||||
110 | } |
||||
111 | |||||
112 | public function import(Context $context, int $offset = 0): Progress |
||||
113 | { |
||||
114 | $progress = $this->importExportService->getProgress($this->logEntity->getId(), $offset); |
||||
115 | $progress->setTotal($this->logEntity->getFile()->getSize()); |
||||
116 | |||||
117 | if ($progress->isFinished()) { |
||||
118 | return $progress; |
||||
119 | } |
||||
120 | |||||
121 | $processed = 0; |
||||
122 | |||||
123 | $path = $this->logEntity->getFile()->getPath(); |
||||
124 | $progress->setTotal($this->filesystem->getSize($path)); |
||||
0 ignored issues
–
show
Bug
introduced
by
Loading history...
|
|||||
125 | $invalidRecordsProgress = null; |
||||
126 | |||||
127 | $failedRecords = []; |
||||
128 | |||||
129 | $resource = $this->filesystem->readStream($path); |
||||
130 | $config = Config::fromLog($this->logEntity); |
||||
131 | |||||
132 | foreach ($this->reader->read($config, $resource, $offset) as $row) { |
||||
0 ignored issues
–
show
It seems like
$resource can also be of type false ; however, parameter $resource of Shopware\Core\Content\Im...\AbstractReader::read() does only seem to accept resource , maybe add an additional type check?
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
Loading history...
|
|||||
133 | $record = []; |
||||
134 | |||||
135 | foreach ($this->pipe->out($config, $row) as $key => $value) { |
||||
136 | $record[$key] = $value; |
||||
137 | } |
||||
138 | |||||
139 | if (empty($record)) { |
||||
140 | continue; |
||||
141 | } |
||||
142 | |||||
143 | try { |
||||
144 | $record = $this->ensurePrimaryKeys($record); |
||||
145 | |||||
146 | $event = new ImportExportBeforeImportRecordEvent($record, $row, $config, $context); |
||||
147 | $this->eventDispatcher->dispatch($event); |
||||
148 | |||||
149 | $record = $event->getRecord(); |
||||
150 | |||||
151 | $result = $this->repository->upsert([$record], $context); |
||||
152 | $progress->addProcessedRecords(1); |
||||
153 | |||||
154 | $afterRecord = new ImportExportAfterImportRecordEvent($result, $record, $row, $config, $context); |
||||
155 | $this->eventDispatcher->dispatch($afterRecord); |
||||
156 | } catch (\Throwable $exception) { |
||||
157 | $event = new ImportExportExceptionImportRecordEvent($exception, $record, $row, $config, $context); |
||||
158 | $this->eventDispatcher->dispatch($event); |
||||
159 | |||||
160 | $exception = $event->getException(); |
||||
161 | |||||
162 | if ($exception) { |
||||
163 | $record['_error'] = mb_convert_encoding($exception->getMessage(), 'UTF-8', 'UTF-8'); |
||||
164 | $failedRecords[] = $record; |
||||
165 | } |
||||
166 | } |
||||
167 | $this->importExportService->saveProgress($progress); |
||||
168 | |||||
169 | if ($this->importLimit > 0 && ++$processed >= $this->importLimit) { |
||||
170 | break; |
||||
171 | } |
||||
172 | } |
||||
173 | $progress->setOffset($this->reader->getOffset()); |
||||
174 | |||||
175 | if (!empty($failedRecords)) { |
||||
176 | $invalidRecordsProgress = $this->exportInvalid($context, $failedRecords); |
||||
177 | $progress->setInvalidRecordsLogId($invalidRecordsProgress->getLogId()); |
||||
178 | } |
||||
179 | |||||
180 | // importing the file is complete |
||||
181 | if ($this->reader->getOffset() === $this->filesystem->getSize($path)) { |
||||
182 | if ($this->logEntity->getInvalidRecordsLog() !== null) { |
||||
183 | $invalidLog = $this->logEntity->getInvalidRecordsLog(); |
||||
184 | $invalidRecordsProgress = $invalidRecordsProgress |
||||
185 | ?? $this->importExportService->getProgress($invalidLog->getId(), $invalidLog->getRecords()); |
||||
186 | |||||
187 | // complete invalid records export |
||||
188 | $this->mergePartFiles($this->logEntity->getInvalidRecordsLog(), $invalidRecordsProgress); |
||||
189 | |||||
190 | $invalidRecordsProgress->setState(Progress::STATE_SUCCEEDED); |
||||
191 | $this->importExportService->saveProgress($invalidRecordsProgress); |
||||
192 | } |
||||
193 | |||||
194 | $progress->setState($invalidRecordsProgress === null ? Progress::STATE_SUCCEEDED : Progress::STATE_FAILED); |
||||
195 | } |
||||
196 | $this->importExportService->saveProgress($progress); |
||||
197 | |||||
198 | return $progress; |
||||
199 | } |
||||
200 | |||||
201 | public function export(Context $context, ?Criteria $criteria = null, int $offset = 0): Progress |
||||
202 | { |
||||
203 | $progress = $this->importExportService->getProgress($this->logEntity->getId(), $offset); |
||||
204 | |||||
205 | if ($progress->isFinished()) { |
||||
206 | return $progress; |
||||
207 | } |
||||
208 | |||||
209 | $config = Config::fromLog($this->logEntity); |
||||
210 | $criteriaBuilder = new CriteriaBuilder($this->repository->getDefinition()); |
||||
211 | |||||
212 | $criteria = $criteria === null ? new Criteria() : clone $criteria; |
||||
213 | $criteriaBuilder->enrichCriteria($config, $criteria); |
||||
214 | |||||
215 | $enrichEvent = new EnrichExportCriteriaEvent($criteria, $this->logEntity); |
||||
216 | $this->eventDispatcher->dispatch($enrichEvent); |
||||
217 | |||||
218 | if ($criteria->getSorting() === []) { |
||||
219 | // default sorting |
||||
220 | $criteria->addSorting(new FieldSorting('createdAt', FieldSorting::ASCENDING)); |
||||
221 | } |
||||
222 | |||||
223 | $criteria->setOffset($offset); |
||||
224 | $criteria->setTotalCountMode(Criteria::TOTAL_COUNT_MODE_EXACT); |
||||
225 | |||||
226 | $criteria->setLimit($this->exportLimit <= 0 ? 250 : $this->exportLimit); |
||||
227 | $fullExport = $this->exportLimit === null || $this->exportLimit <= 0; |
||||
228 | |||||
229 | $targetFile = $this->getPartFilePath($this->logEntity->getFile()->getPath(), $offset); |
||||
230 | |||||
231 | do { |
||||
232 | $result = $this->repository->search($criteria, $context); |
||||
233 | if ($this->total === null) { |
||||
234 | $this->total = $result->getTotal(); |
||||
235 | $criteria->setTotalCountMode(Criteria::TOTAL_COUNT_MODE_NONE); |
||||
236 | } |
||||
237 | |||||
238 | $entities = $result->getEntities(); |
||||
239 | if (\count($entities) === 0) { |
||||
240 | // this can happen if entities are delete while we export |
||||
241 | $progress->setTotal($progress->getOffset()); |
||||
242 | |||||
243 | break; |
||||
244 | } |
||||
245 | |||||
246 | $progress = $this->exportChunk($config, $entities, $progress, $targetFile); |
||||
247 | |||||
248 | $criteria->setOffset($criteria->getOffset() + $criteria->getLimit()); |
||||
249 | } while ($fullExport && $progress->getOffset() < $progress->getTotal()); |
||||
250 | |||||
251 | if ($progress->getTotal() > $progress->getOffset()) { |
||||
252 | return $progress; |
||||
253 | } |
||||
254 | |||||
255 | $this->writer->finish($config, $targetFile); |
||||
256 | |||||
257 | return $this->mergePartFiles($this->logEntity, $progress); |
||||
258 | } |
||||
259 | |||||
260 | public function getLogEntity(): ImportExportLogEntity |
||||
261 | { |
||||
262 | return $this->logEntity; |
||||
263 | } |
||||
264 | |||||
265 | private function getPartFilePath(string $targetPath, int $offset): string |
||||
266 | { |
||||
267 | return $targetPath . self::PART_FILE_SUFFIX . $offset; |
||||
268 | } |
||||
269 | |||||
270 | /** |
||||
271 | * flysystem does not support appending to existing files. Therefore we need to export multiple files and merge them |
||||
272 | * into the complete export file at the end. |
||||
273 | */ |
||||
274 | private function mergePartFiles(ImportExportLogEntity $logEntity, Progress $progress): Progress |
||||
275 | { |
||||
276 | $progress->setState(Progress::STATE_MERGING_FILES); |
||||
277 | $this->importExportService->saveProgress($progress); |
||||
278 | |||||
279 | $tmpFile = tempnam(sys_get_temp_dir(), ''); |
||||
280 | $tmp = fopen($tmpFile, 'w+b'); |
||||
281 | |||||
282 | $target = $logEntity->getFile()->getPath(); |
||||
283 | |||||
284 | $dir = \dirname($target); |
||||
285 | |||||
286 | $partFilePrefix = $target . self::PART_FILE_SUFFIX; |
||||
287 | |||||
288 | $partFiles = []; |
||||
289 | |||||
290 | foreach ($this->filesystem->listContents($dir) as $meta) { |
||||
291 | if ($meta['type'] !== 'file' |
||||
292 | || $meta['path'] === $target |
||||
293 | || strpos($meta['path'], $partFilePrefix) !== 0) { |
||||
294 | continue; |
||||
295 | } |
||||
296 | |||||
297 | $partFiles[] = $meta['path']; |
||||
298 | } |
||||
299 | |||||
300 | // sort by offset |
||||
301 | natsort($partFiles); |
||||
302 | |||||
303 | // concatenate all part files into a temporary file |
||||
304 | foreach ($partFiles as $partFile) { |
||||
305 | if (stream_copy_to_stream($this->filesystem->readStream($partFile), $tmp) === false) { |
||||
0 ignored issues
–
show
It seems like
$this->filesystem->readStream($partFile) can also be of type false ; however, parameter $from of stream_copy_to_stream() does only seem to accept resource , maybe add an additional type check?
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
Loading history...
|
|||||
306 | throw new ProcessingException('Failed to merge files'); |
||||
307 | } |
||||
308 | } |
||||
309 | |||||
310 | // copy final file into filesystem |
||||
311 | $this->filesystem->putStream($target, $tmp); |
||||
312 | |||||
313 | if (\is_resource($tmp)) { |
||||
314 | fclose($tmp); |
||||
315 | } |
||||
316 | unlink($tmpFile); |
||||
317 | |||||
318 | foreach ($partFiles as $p) { |
||||
319 | $this->filesystem->delete($p); |
||||
320 | } |
||||
321 | |||||
322 | $progress->setState(Progress::STATE_SUCCEEDED); |
||||
323 | $this->importExportService->saveProgress($progress); |
||||
324 | |||||
325 | $this->importExportService->updateFile( |
||||
326 | Context::createDefaultContext(), |
||||
327 | $logEntity->getFileId(), |
||||
0 ignored issues
–
show
It seems like
$logEntity->getFileId() can also be of type null ; however, parameter $fileId of Shopware\Core\Content\Im...rtService::updateFile() does only seem to accept string , maybe add an additional type check?
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
Loading history...
|
|||||
328 | ['size' => $this->filesystem->getSize($target)] |
||||
329 | ); |
||||
330 | |||||
331 | return $progress; |
||||
332 | } |
||||
333 | |||||
334 | private function exportChunk(Config $config, iterable $records, Progress $progress, string $targetFile): Progress |
||||
335 | { |
||||
336 | $exportedRecords = 0; |
||||
337 | $offset = $progress->getOffset(); |
||||
338 | /** @var Entity|array $originalRecord */ |
||||
339 | foreach ($records as $originalRecord) { |
||||
340 | $originalRecord = $originalRecord instanceof Entity |
||||
341 | ? $originalRecord->jsonSerialize() |
||||
342 | : $originalRecord; |
||||
343 | |||||
344 | $record = []; |
||||
345 | foreach ($this->pipe->in($config, $originalRecord) as $key => $value) { |
||||
346 | $record[$key] = $value; |
||||
347 | } |
||||
348 | |||||
349 | if ($record !== []) { |
||||
350 | $event = new ImportExportBeforeExportRecordEvent($config, $record, $originalRecord); |
||||
351 | $this->eventDispatcher->dispatch($event); |
||||
352 | |||||
353 | $record = $event->getRecord(); |
||||
354 | |||||
355 | $this->writer->append($config, $record, $offset); |
||||
356 | ++$exportedRecords; |
||||
357 | } |
||||
358 | |||||
359 | ++$offset; |
||||
360 | } |
||||
361 | |||||
362 | $this->writer->flush($config, $targetFile); |
||||
363 | |||||
364 | $progress->setState(Progress::STATE_PROGRESS); |
||||
365 | $progress->setOffset($offset); |
||||
366 | $progress->setTotal($this->total); |
||||
367 | $progress->addProcessedRecords($exportedRecords); |
||||
368 | |||||
369 | $this->importExportService->saveProgress($progress); |
||||
370 | |||||
371 | return $progress; |
||||
372 | } |
||||
373 | |||||
374 | /** |
||||
375 | * In case we failed to import some invalid records, we export them as a new csv with the same format and |
||||
376 | * an additional _error column. |
||||
377 | */ |
||||
378 | private function exportInvalid(Context $context, array $failedRecords): Progress |
||||
379 | { |
||||
380 | // created a invalid records export if it doesn't exist |
||||
381 | if (!$this->logEntity->getInvalidRecordsLogId()) { |
||||
382 | $pathInfo = pathinfo($this->logEntity->getFile()->getOriginalName()); |
||||
383 | $newName = $pathInfo['filename'] . '_failed.' . $pathInfo['extension']; |
||||
384 | |||||
385 | $newPath = $this->logEntity->getFile()->getPath() . '_invalid'; |
||||
386 | |||||
387 | $config = $this->logEntity->getConfig(); |
||||
388 | $config['mapping'][] = [ |
||||
389 | 'key' => '_error', |
||||
390 | 'mappedKey' => '_error', |
||||
391 | ]; |
||||
392 | $config = new Config($config['mapping'], $config['parameters'] ?? []); |
||||
393 | |||||
394 | $failedImportLogEntity = $this->importExportService->prepareExport( |
||||
395 | $context, |
||||
396 | $this->logEntity->getProfileId(), |
||||
0 ignored issues
–
show
It seems like
$this->logEntity->getProfileId() can also be of type null ; however, parameter $profileId of Shopware\Core\Content\Im...ervice::prepareExport() does only seem to accept string , maybe add an additional type check?
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
Loading history...
|
|||||
397 | $this->logEntity->getFile()->getExpireDate(), |
||||
398 | $newName, |
||||
399 | $config->jsonSerialize(), |
||||
400 | $newPath, |
||||
401 | ImportExportLogEntity::ACTIVITY_INVALID_RECORDS_EXPORT |
||||
402 | ); |
||||
403 | |||||
404 | $this->logEntity->setInvalidRecordsLog($failedImportLogEntity); |
||||
405 | $this->logEntity->setInvalidRecordsLogId($failedImportLogEntity->getId()); |
||||
406 | } |
||||
407 | |||||
408 | $failedImportLogEntity = $this->logEntity->getInvalidRecordsLog(); |
||||
409 | $config = Config::fromLog($failedImportLogEntity); |
||||
0 ignored issues
–
show
It seems like
$failedImportLogEntity can also be of type null ; however, parameter $log of Shopware\Core\Content\Im...truct\Config::fromLog() does only seem to accept Shopware\Core\Content\Im...g\ImportExportLogEntity , maybe add an additional type check?
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
Loading history...
|
|||||
410 | |||||
411 | $offset = $failedImportLogEntity->getRecords(); |
||||
412 | |||||
413 | $targetFile = $this->getPartFilePath($failedImportLogEntity->getFile()->getPath(), $offset); |
||||
414 | |||||
415 | $progress = $this->importExportService->getProgress($failedImportLogEntity->getId(), $offset); |
||||
416 | |||||
417 | $progress = $this->exportChunk( |
||||
418 | $config, |
||||
419 | $failedRecords, |
||||
420 | $progress, |
||||
421 | $targetFile |
||||
422 | ); |
||||
423 | |||||
424 | return $progress; |
||||
425 | } |
||||
426 | |||||
427 | private function ensurePrimaryKeys(array $data): array |
||||
428 | { |
||||
429 | foreach ($this->repository->getDefinition()->getPrimaryKeys() as $primaryKey) { |
||||
430 | if (!($primaryKey instanceof IdField)) { |
||||
431 | continue; |
||||
432 | } |
||||
433 | |||||
434 | if (!isset($data[$primaryKey->getPropertyName()])) { |
||||
435 | $data[$primaryKey->getPropertyName()] = Uuid::randomHex(); |
||||
436 | } |
||||
437 | } |
||||
438 | |||||
439 | return $data; |
||||
440 | } |
||||
441 | } |
||||
442 |