ChainProcessorsManager::getRawDefinition()   A
last analyzed

Complexity

Conditions 5
Paths 4

Size

Total Lines 15
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 8
c 0
b 0
f 0
dl 0
loc 15
rs 9.6111
cc 5
nc 4
nop 1
1
<?php
2
3
namespace Oliverde8\PhpEtlBundle\Services;
4
5
use Oliverde8\Component\PhpEtl\ChainProcessor;
6
use Oliverde8\PhpEtlBundle\Entity\EtlExecution;
7
use Oliverde8\PhpEtlBundle\Exception\UnknownChainException;
8
use Oliverde8\PhpEtlBundle\Factory\ChainFactory;
9
use Oliverde8\PhpEtlBundle\Repository\EtlExecutionRepository;
10
11
class ChainProcessorsManager
12
{
13
    protected EtlExecutionRepository $etlExecutionRepository;
14
    protected LoggerFactory $loggerFactory;
15
    protected ChainFactory $chainFactory;
16
    protected array $definitions;
17
    protected array $rewDefinitions;
18
19
    public function __construct(
20
        EtlExecutionRepository $etlExecutionRepository,
21
        LoggerFactory $loggerFactory,
22
        ChainFactory $chainFactory,
23
        array $definitions,
24
        array $rawDefinitions
25
    ) {
26
        $this->etlExecutionRepository = $etlExecutionRepository;
27
        $this->loggerFactory = $loggerFactory;
28
        $this->chainFactory = $chainFactory;
29
        $this->definitions = $definitions;
30
        $this->rewDefinitions = $rawDefinitions;
31
    }
32
33
    /**
34
     * @throws UnknownChainException
35
     */
36
    public function getRawDefinition(string $chainName): string
37
    {
38
        if (!isset($this->rewDefinitions[$chainName])) {
39
            $alternatives = [];
40
            foreach (array_keys($this->rewDefinitions) as $knownId) {
41
                $lev = levenshtein($chainName, $knownId);
42
                if ($lev <= \strlen($chainName) / 3 || str_contains($knownId, $chainName)) {
43
                    $alternatives[] = $knownId;
44
                }
45
            }
46
47
            throw new UnknownChainException("Unknown chain '$chainName', did you mean: " . implode(", ", $alternatives));
48
        }
49
50
        return $this->rewDefinitions[$chainName];
51
    }
52
53
    public function getRewDefinitions(): array
54
    {
55
        return $this->rewDefinitions;
56
    }
57
58
    public function getProcessor(string $chainName, array $options): ChainProcessor
59
    {
60
        $this->getRawDefinition($chainName);
61
        $definition = $this->definitions[$chainName];
62
        $chain = $definition['chain'];
63
        $maxAsynchronousItems = $definition['maxAsynchronousItems'] ?? 20;
64
        $defaultOptions = $definition['defaultOptions'] ?? [];
65
66
        $options = array_merge($defaultOptions, $options);
67
68
        return $this->chainFactory->create($chain, $options, $maxAsynchronousItems);
69
    }
70
71
    /**
72
     * Execute a particular chanin
73
     *
74
     * @param string $chainName
75
     * @param iterable $iterator
76
     * @param array $params
77
     *
78
     * @throws \Exception
79
     */
80
    public function execute(string $chainName, iterable $iterator, array $params)
81
    {
82
        $definition = $this->getRawDefinition($chainName);
83
84
        $inputData = ["Iterator! Can't show input data"];
85
        if (is_array($iterator)) {
86
            $inputData = $iterator;
87
            $iterator = new \ArrayIterator($iterator);
88
        }
89
90
        $execution = new EtlExecution($chainName, $definition, $inputData, $params);
91
        $execution->setStatus(EtlExecution::STATUS_RUNNING);
92
        $this->etlExecutionRepository->save($execution);
93
94
        $this->executeFromEtlEntity($execution, $iterator);
95
    }
96
97
    /**
98
     * Execute a chain from it's entity.
99
     *
100
     */
101
    public function executeFromEtlEntity(EtlExecution $execution, iterable $iterator = null): void
102
    {
103
        $chainName = $execution->getName();
104
        $logger = $this->loggerFactory->get($execution);
0 ignored issues
show
Unused Code introduced by
The assignment to $logger is dead and can be removed.
Loading history...
105
106
        try {
107
            // Update execution object with new status.
108
            $execution->setStatus(EtlExecution::STATUS_RUNNING);
109
            $execution->setStartTime(new \DateTime());
110
            $execution->setWaitTime(time() - $execution->getCreateTime()->getTimestamp());
111
            $this->etlExecutionRepository->save($execution);
112
113
            // Build the processor.
114
            $params = json_decode($execution->getInputOptions(), true);
0 ignored issues
show
Bug introduced by
It seems like $execution->getInputOptions() can also be of type null; however, parameter $json of json_decode() does only seem to accept string, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

114
            $params = json_decode(/** @scrutinizer ignore-type */ $execution->getInputOptions(), true);
Loading history...
115
            $processor = $this->getProcessor($chainName, $params);
116
117
            if (is_null($iterator)) {
118
                $iterator = new \ArrayIterator(json_decode($execution->getInputData(), true));
119
            }
120
            $params['etl'] = [
121
                'chain' => $chainName,
122
                'startTime' => new \DateTime(),
123
                'execution' => $execution
124
            ];
125
126
            // Start the process.
127
            $processor->process($iterator, $params);
128
            $execution = $this->etlExecutionRepository->find($execution->getId());
129
            $execution->setStatus(EtlExecution::STATUS_SUCCESS);
130
        } catch (\Throwable $exception) {
131
            $execution = $this->etlExecutionRepository->find($execution->getId());
132
            $execution->setFailTime(new \DateTime());
133
            $execution->setStatus(EtlExecution::STATUS_FAILURE);
134
            $execution->setErrorMessage($this->getFullExceptionTrace($exception));
135
            throw $exception;
136
        } finally {
137
            $execution->setEndTime(new \DateTime());
138
            $execution->setRunTime(time() - $execution->getStartTime()->getTimestamp());
139
            $execution->setStepStats('[]'); // To be developped
140
            $this->etlExecutionRepository->save($execution);
0 ignored issues
show
Bug introduced by
It seems like $execution can also be of type null; however, parameter $execution of Oliverde8\PhpEtlBundle\R...utionRepository::save() does only seem to accept Oliverde8\PhpEtlBundle\Entity\EtlExecution, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

140
            $this->etlExecutionRepository->save(/** @scrutinizer ignore-type */ $execution);
Loading history...
141
        }
142
    }
143
144
    protected function getFullExceptionTrace(\Throwable $exception): string
145
    {
146
        $message = '';
147
        do {
148
            $message .= $exception->getMessage() . "\n" . $exception->getTraceAsString() . "\n\n";
149
        } while ($exception = $exception->getPrevious());
150
151
        return $message;
152
    }
153
}
154