Passed
Push — main ( 492649...1b3827 )
by De Cramer
07:40
created

ChainProcessorsManager::executeFromEtlEntity()   A

Complexity

Conditions 3
Paths 58

Size

Total Lines 36
Code Lines 26

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
eloc 26
c 1
b 0
f 0
dl 0
loc 36
rs 9.504
cc 3
nc 58
nop 2
1
<?php
2
3
4
namespace Oliverde8\PhpEtlBundle\Services;
5
6
use Oliverde8\Component\PhpEtl\ChainProcessor;
7
use Oliverde8\Component\PhpEtl\Exception\ChainOperationException;
8
use Oliverde8\Component\PhpEtl\Item\DataItem;
9
use Oliverde8\Component\PhpEtl\Item\DataItemInterface;
10
use Oliverde8\PhpEtlBundle\Entity\EtlExecution;
11
use Oliverde8\PhpEtlBundle\Repository\EtlExecutionRepository;
12
use Psr\Container\ContainerInterface;
13
use Psr\Log\LoggerInterface;
14
15
class ChainProcessorsManager
16
{
17
    /** @var ContainerInterface */
18
    protected $container;
19
20
    /** @var EtlExecutionRepository */
21
    protected $etlExecutionRepository;
22
23
    /** @var ChainExecutionLogger */
24
    protected $chainExecutionLogger;
25
26
    /** @var LoggerInterface */
27
    protected $logger;
28
29
    /** @var array */
30
    protected $definitions;
31
32
33
    /**
34
     * ChainProcessorsManager constructor.
35
     * @param ContainerInterface $container
36
     * @param EtlExecutionRepository $etlExecutionRepository
37
     * @param array $definitions
38
     */
39
    public function __construct(
40
        ContainerInterface $container,
41
        EtlExecutionRepository $etlExecutionRepository,
42
        ChainExecutionLogger $chainExecutionLogger,
43
        LoggerInterface $logger,
44
        array $definitions
45
    ) {
46
        $this->container = $container;
47
        $this->etlExecutionRepository = $etlExecutionRepository;
48
        $this->chainExecutionLogger = $chainExecutionLogger;
49
        $this->logger = $logger;
50
        $this->definitions = $definitions;
51
    }
52
53
    public function getDefinition(string $chainName): string
54
    {
55
        return $this->definitions[$chainName];
56
    }
57
58
    public function getProcessor(string $chainName): ChainProcessor
59
    {
60
        // TODO Think about either creating the processor & runtime or injecting them into the constructor like the definitions.
61
        return $this->container->get("oliverde8.etl.chain.$chainName");
62
    }
63
64
    /**
65
     * Execute a particular chanin
66
     *
67
     * @param string $chainName
68
     * @param $iterator
69
     * @param array $params
70
     *
71
     * @throws \Exception
72
     */
73
    public function execute(string $chainName, $iterator, array $params)
74
    {
75
        $definition = $this->getDefinition($chainName);
76
77
        $inputData = ["Iterator! Can't show input data"];
78
        if (is_array($iterator)) {
79
            $inputData = $iterator;
80
            $iterator = new \ArrayIterator($iterator);
81
        }
82
83
        $execution = new EtlExecution($chainName, $definition, $inputData, $params);
84
        $execution->setStatus(EtlExecution::STATUS_RUNNING);
85
        $this->etlExecutionRepository->save($execution);
86
87
        $this->executeFromEtlEntity($execution, $iterator);
88
    }
89
90
    /**
91
     * Execute a chain from it's entity.
92
     *
93
     * @param EtlExecution $execution
94
     * @param null $iterator
0 ignored issues
show
Documentation Bug introduced by
Are you sure the doc-type for parameter $iterator is correct as it would always require null to be passed?
Loading history...
95
     * @throws ChainOperationException
96
     */
97
    public function executeFromEtlEntity(EtlExecution $execution, $iterator = null)
98
    {
99
        $this->chainExecutionLogger->setCurrentExecution($execution);
100
101
        $chainName = $execution->getName();
102
        $processor = $this->getProcessor($chainName);
103
        $params = json_decode($execution->getInputOptions());
0 ignored issues
show
Bug introduced by
It seems like $execution->getInputOptions() can also be of type null; however, parameter $json of json_decode() does only seem to accept string, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

103
        $params = json_decode(/** @scrutinizer ignore-type */ $execution->getInputOptions());
Loading history...
104
105
        if (is_null($iterator)) {
0 ignored issues
show
introduced by
The condition is_null($iterator) is always true.
Loading history...
106
            $iterator = new \ArrayIterator(json_decode($execution->getInputData()));
107
        }
108
109
        $execution->setStatus(EtlExecution::STATUS_RUNNING);
110
        $this->etlExecutionRepository->save($execution);
111
        $params['etl'] = ['chain' => $chainName, 'startTime' => new \DateTime()];
112
113
        try {
114
            $this->logger->info("Starting etl process!", $params);
115
            $processor->process($iterator, $params);
116
            $execution->setStatus(EtlExecution::STATUS_SUCCESS);
117
118
            $this->logger->info("Finished etl process!", $params);
119
        } catch (\Exception $exception) {
120
            $params['exception'] = $exception;
121
            $this->logger->info("Faild during etl process!", $params);
122
123
            $execution->setFailTime(new \DateTime());
124
            $execution->setStatus(EtlExecution::STATUS_FAILURE);
125
            $execution->setErrorMessage($exception->getMessage() . "\n" . $exception->getTraceAsString());
126
            throw $exception;
127
        } finally {
128
            $execution->setEndTime(new \DateTime());
129
            $execution->setStepStats('[]'); // To be developped
130
            $this->etlExecutionRepository->save($execution);
131
132
            $this->chainExecutionLogger->setCurrentExecution(null);
133
        }
134
    }
135
}
136