1
|
|
|
<?php |
2
|
|
|
declare(strict_types=1); |
3
|
|
|
|
4
|
|
|
namespace Oliverde8\Component\PhpEtl\ChainOperation\Transformer; |
5
|
|
|
|
6
|
|
|
use Oliverde8\Component\PhpEtl\ChainOperation\AbstractChainOperation; |
7
|
|
|
use Oliverde8\Component\PhpEtl\Item\DataItem; |
8
|
|
|
use Oliverde8\Component\PhpEtl\Item\ExternalFileItem; |
9
|
|
|
use Oliverde8\Component\PhpEtl\Item\ItemInterface; |
10
|
|
|
use Oliverde8\Component\PhpEtl\Item\MixItem; |
11
|
|
|
use Oliverde8\Component\PhpEtl\Model\ExecutionContext; |
12
|
|
|
|
13
|
|
|
class ExternalFileProcessorOperation extends AbstractChainOperation |
14
|
|
|
{ |
15
|
|
|
public function processFile(ExternalFileItem $item, ExecutionContext $context): ItemInterface |
16
|
|
|
{ |
17
|
|
|
$externalFilePath = $item->getFilePath(); |
18
|
|
|
$externalDir = dirname($externalFilePath); |
19
|
|
|
$fileName = basename($externalFilePath); |
20
|
|
|
$externalFileSystem = $item->getFileSystem(); |
21
|
|
|
$localFileSystem = $context->getFileSystem(); |
22
|
|
|
|
23
|
|
|
if ($item->getState() == ExternalFileItem::STATE_NEW) { |
24
|
|
|
var_dump("EXECUTE"); |
|
|
|
|
25
|
|
|
// Move file to prevent it to be processed by another process. |
26
|
|
|
$externalFileSystem->createDirectory($externalDir . "/processing"); |
27
|
|
|
$externalFileSystem->move($externalFilePath, $externalDir . "/processing/" . $fileName); |
28
|
|
|
|
29
|
|
|
$localFileSystem->writeStream($fileName, $externalFileSystem->readStream($externalDir . "/processing/" . $fileName)); |
30
|
|
|
|
31
|
|
|
$item->setState(ExternalFileItem::STATE_PROCESSING); |
32
|
|
|
return new MixItem([new DataItem($fileName), $item]); |
33
|
|
|
} else { |
34
|
|
|
$externalFileSystem->createDirectory($externalDir . "/processed"); |
35
|
|
|
$externalFileSystem->move($externalDir . "/processing/" . $fileName, $externalDir . "/processed/" . $fileName); |
36
|
|
|
|
37
|
|
|
$item->setState(ExternalFileItem::STATE_PROCESSED); |
38
|
|
|
return $item; |
39
|
|
|
} |
40
|
|
|
} |
41
|
|
|
} |