1 | <?php |
||||
2 | |||||
3 | namespace Oliverde8\PhpEtlBundle\Services; |
||||
4 | |||||
5 | use Oliverde8\Component\PhpEtl\ChainProcessor; |
||||
6 | use Oliverde8\PhpEtlBundle\Entity\EtlExecution; |
||||
7 | use Oliverde8\PhpEtlBundle\Exception\UnknownChainException; |
||||
8 | use Oliverde8\PhpEtlBundle\Factory\ChainFactory; |
||||
9 | use Oliverde8\PhpEtlBundle\Repository\EtlExecutionRepository; |
||||
10 | |||||
11 | class ChainProcessorsManager |
||||
12 | { |
||||
13 | protected EtlExecutionRepository $etlExecutionRepository; |
||||
14 | protected LoggerFactory $loggerFactory; |
||||
15 | protected ChainFactory $chainFactory; |
||||
16 | protected array $definitions; |
||||
17 | protected array $rewDefinitions; |
||||
18 | |||||
19 | public function __construct( |
||||
20 | EtlExecutionRepository $etlExecutionRepository, |
||||
21 | LoggerFactory $loggerFactory, |
||||
22 | ChainFactory $chainFactory, |
||||
23 | array $definitions, |
||||
24 | array $rawDefinitions |
||||
25 | ) { |
||||
26 | $this->etlExecutionRepository = $etlExecutionRepository; |
||||
27 | $this->loggerFactory = $loggerFactory; |
||||
28 | $this->chainFactory = $chainFactory; |
||||
29 | $this->definitions = $definitions; |
||||
30 | $this->rewDefinitions = $rawDefinitions; |
||||
31 | } |
||||
32 | |||||
33 | /** |
||||
34 | * @throws UnknownChainException |
||||
35 | */ |
||||
36 | public function getRawDefinition(string $chainName): string |
||||
37 | { |
||||
38 | if (!isset($this->rewDefinitions[$chainName])) { |
||||
39 | $alternatives = []; |
||||
40 | foreach (array_keys($this->rewDefinitions) as $knownId) { |
||||
41 | $lev = levenshtein($chainName, $knownId); |
||||
42 | if ($lev <= \strlen($chainName) / 3 || str_contains($knownId, $chainName)) { |
||||
43 | $alternatives[] = $knownId; |
||||
44 | } |
||||
45 | } |
||||
46 | |||||
47 | throw new UnknownChainException("Unknown chain '$chainName', did you mean: " . implode(", ", $alternatives)); |
||||
48 | } |
||||
49 | |||||
50 | return $this->rewDefinitions[$chainName]; |
||||
51 | } |
||||
52 | |||||
53 | public function getRewDefinitions(): array |
||||
54 | { |
||||
55 | return $this->rewDefinitions; |
||||
56 | } |
||||
57 | |||||
58 | public function getProcessor(string $chainName, array $options): ChainProcessor |
||||
59 | { |
||||
60 | $this->getRawDefinition($chainName); |
||||
61 | $definition = $this->definitions[$chainName]; |
||||
62 | $chain = $definition['chain']; |
||||
63 | $maxAsynchronousItems = $definition['maxAsynchronousItems'] ?? 20; |
||||
64 | $defaultOptions = $definition['defaultOptions'] ?? []; |
||||
65 | |||||
66 | $options = array_merge($defaultOptions, $options); |
||||
67 | |||||
68 | return $this->chainFactory->create($chain, $options, $maxAsynchronousItems); |
||||
69 | } |
||||
70 | |||||
71 | /** |
||||
72 | * Execute a particular chanin |
||||
73 | * |
||||
74 | * @param string $chainName |
||||
75 | * @param iterable $iterator |
||||
76 | * @param array $params |
||||
77 | * |
||||
78 | * @throws \Exception |
||||
79 | */ |
||||
80 | public function execute(string $chainName, iterable $iterator, array $params) |
||||
81 | { |
||||
82 | $definition = $this->getRawDefinition($chainName); |
||||
83 | |||||
84 | $inputData = ["Iterator! Can't show input data"]; |
||||
85 | if (is_array($iterator)) { |
||||
86 | $inputData = $iterator; |
||||
87 | $iterator = new \ArrayIterator($iterator); |
||||
88 | } |
||||
89 | |||||
90 | $execution = new EtlExecution($chainName, $definition, $inputData, $params); |
||||
91 | $execution->setStatus(EtlExecution::STATUS_RUNNING); |
||||
92 | $this->etlExecutionRepository->save($execution); |
||||
93 | |||||
94 | $this->executeFromEtlEntity($execution, $iterator); |
||||
95 | } |
||||
96 | |||||
97 | /** |
||||
98 | * Execute a chain from it's entity. |
||||
99 | * |
||||
100 | */ |
||||
101 | public function executeFromEtlEntity(EtlExecution $execution, iterable $iterator = null): void |
||||
102 | { |
||||
103 | $chainName = $execution->getName(); |
||||
104 | $logger = $this->loggerFactory->get($execution); |
||||
0 ignored issues
–
show
Unused Code
introduced
by
![]() |
|||||
105 | |||||
106 | try { |
||||
107 | // Update execution object with new status. |
||||
108 | $execution->setStatus(EtlExecution::STATUS_RUNNING); |
||||
109 | $execution->setStartTime(new \DateTime()); |
||||
110 | $execution->setWaitTime(time() - $execution->getCreateTime()->getTimestamp()); |
||||
111 | $this->etlExecutionRepository->save($execution); |
||||
112 | |||||
113 | // Build the processor. |
||||
114 | $params = json_decode($execution->getInputOptions(), true); |
||||
0 ignored issues
–
show
It seems like
$execution->getInputOptions() can also be of type null ; however, parameter $json of json_decode() does only seem to accept string , maybe add an additional type check?
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
![]() |
|||||
115 | $processor = $this->getProcessor($chainName, $params); |
||||
116 | |||||
117 | if (is_null($iterator)) { |
||||
118 | $iterator = new \ArrayIterator(json_decode($execution->getInputData(), true)); |
||||
119 | } |
||||
120 | $params['etl'] = [ |
||||
121 | 'chain' => $chainName, |
||||
122 | 'startTime' => new \DateTime(), |
||||
123 | 'execution' => $execution |
||||
124 | ]; |
||||
125 | |||||
126 | // Start the process. |
||||
127 | $processor->process($iterator, $params); |
||||
128 | $execution = $this->etlExecutionRepository->find($execution->getId()); |
||||
129 | $execution->setStatus(EtlExecution::STATUS_SUCCESS); |
||||
130 | } catch (\Throwable $exception) { |
||||
131 | $execution = $this->etlExecutionRepository->find($execution->getId()); |
||||
132 | $execution->setFailTime(new \DateTime()); |
||||
133 | $execution->setStatus(EtlExecution::STATUS_FAILURE); |
||||
134 | $execution->setErrorMessage($this->getFullExceptionTrace($exception)); |
||||
135 | throw $exception; |
||||
136 | } finally { |
||||
137 | $execution->setEndTime(new \DateTime()); |
||||
138 | $execution->setRunTime(time() - $execution->getStartTime()->getTimestamp()); |
||||
139 | $execution->setStepStats('[]'); // To be developped |
||||
140 | $this->etlExecutionRepository->save($execution); |
||||
0 ignored issues
–
show
It seems like
$execution can also be of type null ; however, parameter $execution of Oliverde8\PhpEtlBundle\R...utionRepository::save() does only seem to accept Oliverde8\PhpEtlBundle\Entity\EtlExecution , maybe add an additional type check?
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
![]() |
|||||
141 | } |
||||
142 | } |
||||
143 | |||||
144 | protected function getFullExceptionTrace(\Throwable $exception): string |
||||
145 | { |
||||
146 | $message = ''; |
||||
147 | do { |
||||
148 | $message .= $exception->getMessage() . "\n" . $exception->getTraceAsString() . "\n\n"; |
||||
149 | } while ($exception = $exception->getPrevious()); |
||||
150 | |||||
151 | return $message; |
||||
152 | } |
||||
153 | } |
||||
154 |