|
1
|
|
|
<?php |
|
2
|
|
|
declare(strict_types=1); |
|
3
|
|
|
|
|
4
|
|
|
namespace Oliverde8\Component\PhpEtl\ChainOperation\Transformer; |
|
5
|
|
|
|
|
6
|
|
|
use Oliverde8\Component\PhpEtl\ChainOperation\AbstractChainOperation; |
|
7
|
|
|
use Oliverde8\Component\PhpEtl\Item\DataItem; |
|
8
|
|
|
use Oliverde8\Component\PhpEtl\Item\ExternalFileItem; |
|
9
|
|
|
use Oliverde8\Component\PhpEtl\Item\ItemInterface; |
|
10
|
|
|
use Oliverde8\Component\PhpEtl\Item\MixItem; |
|
11
|
|
|
use Oliverde8\Component\PhpEtl\Model\ExecutionContext; |
|
12
|
|
|
|
|
13
|
|
|
class ExternalFileProcessorOperation extends AbstractChainOperation |
|
14
|
|
|
{ |
|
15
|
|
|
public function processFile(ExternalFileItem $item, ExecutionContext $context): ItemInterface |
|
16
|
|
|
{ |
|
17
|
|
|
$externalFilePath = $item->getFilePath(); |
|
18
|
|
|
$externalDir = dirname($externalFilePath); |
|
19
|
|
|
$fileName = basename($externalFilePath); |
|
20
|
|
|
$externalFileSystem = $item->getFileSystem(); |
|
21
|
|
|
$localFileSystem = $context->getFileSystem(); |
|
22
|
|
|
|
|
23
|
|
|
if ($item->getState() == ExternalFileItem::STATE_NEW) { |
|
24
|
|
|
// Move file to prevent it to be processed by another process. |
|
25
|
|
|
$externalFileSystem->createDirectory($externalDir . "/processing"); |
|
26
|
|
|
$externalFileSystem->move($externalFilePath, $externalDir . "/processing/" . $fileName); |
|
27
|
|
|
|
|
28
|
|
|
$localFileSystem->writeStream($fileName, $externalFileSystem->readStream($externalDir . "/processing/" . $fileName)); |
|
29
|
|
|
|
|
30
|
|
|
$item->setState(ExternalFileItem::STATE_PROCESSING); |
|
31
|
|
|
return new MixItem([new DataItem($fileName), $item]); |
|
32
|
|
|
} else { |
|
33
|
|
|
$externalFileSystem->createDirectory($externalDir . "/processed"); |
|
34
|
|
|
$externalFileSystem->move($externalDir . "/processing/" . $fileName, $externalDir . "/processed/" . $fileName); |
|
35
|
|
|
|
|
36
|
|
|
$item->setState(ExternalFileItem::STATE_PROCESSED); |
|
37
|
|
|
return $item; |
|
38
|
|
|
} |
|
39
|
|
|
} |
|
40
|
|
|
} |