Passed
Push — main ( 987cb0...4f2490 )
by De Cramer
02:59
created

ChainProcessorsManager::getDefinitions()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 1
c 0
b 0
f 0
dl 0
loc 3
rs 10
cc 1
nc 1
nop 0
1
<?php
2
3
4
namespace Oliverde8\PhpEtlBundle\Services;
5
6
use Oliverde8\Component\PhpEtl\ChainProcessor;
7
use Oliverde8\Component\PhpEtl\Exception\ChainOperationException;
8
use Oliverde8\Component\PhpEtl\Item\DataItem;
9
use Oliverde8\Component\PhpEtl\Item\DataItemInterface;
10
use Oliverde8\PhpEtlBundle\Entity\EtlExecution;
11
use Oliverde8\PhpEtlBundle\Repository\EtlExecutionRepository;
12
use Psr\Container\ContainerInterface;
13
use Psr\Log\LoggerInterface;
14
15
class ChainProcessorsManager
16
{
17
    /** @var ContainerInterface */
18
    protected $container;
19
20
    /** @var EtlExecutionRepository */
21
    protected $etlExecutionRepository;
22
23
    /** @var ChainExecutionLogger */
24
    protected $chainExecutionLogger;
25
26
    /** @var LoggerInterface */
27
    protected $logger;
28
29
    /** @var array */
30
    protected $definitions;
31
32
33
    /**
34
     * ChainProcessorsManager constructor.
35
     * @param ContainerInterface $container
36
     * @param EtlExecutionRepository $etlExecutionRepository
37
     * @param array $definitions
38
     */
39
    public function __construct(
40
        ContainerInterface $container,
41
        EtlExecutionRepository $etlExecutionRepository,
42
        ChainExecutionLogger $chainExecutionLogger,
43
        LoggerInterface $logger,
44
        array $definitions
45
    ) {
46
        $this->container = $container;
47
        $this->etlExecutionRepository = $etlExecutionRepository;
48
        $this->chainExecutionLogger = $chainExecutionLogger;
49
        $this->logger = $logger;
50
        $this->definitions = $definitions;
51
    }
52
53
    public function getDefinition(string $chainName): string
54
    {
55
        return $this->definitions[$chainName];
56
    }
57
58
    public function getDefinitions(): array
59
    {
60
        return $this->definitions;
61
    }
62
63
    public function getProcessor(string $chainName): ChainProcessor
64
    {
65
        // TODO Think about either creating the processor & runtime or injecting them into the constructor like the definitions.
66
        return $this->container->get("oliverde8.etl.chain.$chainName");
67
    }
68
69
    /**
70
     * Execute a particular chanin
71
     *
72
     * @param string $chainName
73
     * @param $iterator
74
     * @param array $params
75
     *
76
     * @throws \Exception
77
     */
78
    public function execute(string $chainName, $iterator, array $params)
79
    {
80
        $definition = $this->getDefinition($chainName);
81
82
        $inputData = ["Iterator! Can't show input data"];
83
        if (is_array($iterator)) {
84
            $inputData = $iterator;
85
            $iterator = new \ArrayIterator($iterator);
86
        }
87
88
        $execution = new EtlExecution($chainName, $definition, $inputData, $params);
89
        $execution->setStatus(EtlExecution::STATUS_RUNNING);
90
        $this->etlExecutionRepository->save($execution);
91
92
        $this->executeFromEtlEntity($execution, $iterator);
93
    }
94
95
    /**
96
     * Execute a chain from it's entity.
97
     *
98
     * @param EtlExecution $execution
99
     * @param null $iterator
0 ignored issues
show
Documentation Bug introduced by
Are you sure the doc-type for parameter $iterator is correct as it would always require null to be passed?
Loading history...
100
     * @throws ChainOperationException
101
     */
102
    public function executeFromEtlEntity(EtlExecution $execution, $iterator = null)
103
    {
104
        $this->chainExecutionLogger->setCurrentExecution($execution);
105
106
        $chainName = $execution->getName();
107
        $processor = $this->getProcessor($chainName);
108
        $params = json_decode($execution->getInputOptions());
0 ignored issues
show
Bug introduced by
It seems like $execution->getInputOptions() can also be of type null; however, parameter $json of json_decode() does only seem to accept string, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

108
        $params = json_decode(/** @scrutinizer ignore-type */ $execution->getInputOptions());
Loading history...
109
110
111
        if (is_null($iterator)) {
0 ignored issues
show
introduced by
The condition is_null($iterator) is always true.
Loading history...
112
            $iterator = new \ArrayIterator(json_decode($execution->getInputData()));
113
        }
114
115
        $execution->setStatus(EtlExecution::STATUS_RUNNING);
116
        $execution->setStartTime(new \DateTime());
117
        $this->etlExecutionRepository->save($execution);
118
        $params['etl'] = ['chain' => $chainName, 'startTime' => new \DateTime()];
119
120
        try {
121
            $this->logger->info("Starting etl process!", $params);
122
            $processor->process($iterator, $params);
123
            $execution->setStatus(EtlExecution::STATUS_SUCCESS);
124
125
            $this->logger->info("Finished etl process!", $params);
126
        } catch (\Exception $exception) {
127
            $params['exception'] = $exception;
128
            $this->logger->info("Failed during etl process!", $params);
129
130
            $execution->setFailTime(new \DateTime());
131
            $execution->setStatus(EtlExecution::STATUS_FAILURE);
132
            $execution->setErrorMessage($exception->getMessage() . "\n" . $exception->getTraceAsString());
133
            throw $exception;
134
        } finally {
135
            $execution->setEndTime(new \DateTime());
136
            $execution->setStepStats('[]'); // To be developped
137
            $this->etlExecutionRepository->save($execution);
138
139
            $this->chainExecutionLogger->setCurrentExecution(null);
140
        }
141
    }
142
}
143