oliverde8 /
phpEtlBundle
| 1 | <?php |
||||
| 2 | |||||
| 3 | namespace Oliverde8\PhpEtlBundle\Services; |
||||
| 4 | |||||
| 5 | use Oliverde8\Component\PhpEtl\ChainProcessor; |
||||
| 6 | use Oliverde8\PhpEtlBundle\Entity\EtlExecution; |
||||
| 7 | use Oliverde8\PhpEtlBundle\Exception\UnknownChainException; |
||||
| 8 | use Oliverde8\PhpEtlBundle\Factory\ChainFactory; |
||||
| 9 | use Oliverde8\PhpEtlBundle\Repository\EtlExecutionRepository; |
||||
| 10 | |||||
| 11 | class ChainProcessorsManager |
||||
| 12 | { |
||||
| 13 | protected EtlExecutionRepository $etlExecutionRepository; |
||||
| 14 | protected LoggerFactory $loggerFactory; |
||||
| 15 | protected ChainFactory $chainFactory; |
||||
| 16 | protected array $definitions; |
||||
| 17 | protected array $rewDefinitions; |
||||
| 18 | |||||
| 19 | public function __construct( |
||||
| 20 | EtlExecutionRepository $etlExecutionRepository, |
||||
| 21 | LoggerFactory $loggerFactory, |
||||
| 22 | ChainFactory $chainFactory, |
||||
| 23 | array $definitions, |
||||
| 24 | array $rawDefinitions |
||||
| 25 | ) { |
||||
| 26 | $this->etlExecutionRepository = $etlExecutionRepository; |
||||
| 27 | $this->loggerFactory = $loggerFactory; |
||||
| 28 | $this->chainFactory = $chainFactory; |
||||
| 29 | $this->definitions = $definitions; |
||||
| 30 | $this->rewDefinitions = $rawDefinitions; |
||||
| 31 | } |
||||
| 32 | |||||
| 33 | /** |
||||
| 34 | * @throws UnknownChainException |
||||
| 35 | */ |
||||
| 36 | public function getRawDefinition(string $chainName): string |
||||
| 37 | { |
||||
| 38 | if (!isset($this->rewDefinitions[$chainName])) { |
||||
| 39 | $alternatives = []; |
||||
| 40 | foreach (array_keys($this->rewDefinitions) as $knownId) { |
||||
| 41 | $lev = levenshtein($chainName, $knownId); |
||||
| 42 | if ($lev <= \strlen($chainName) / 3 || str_contains($knownId, $chainName)) { |
||||
| 43 | $alternatives[] = $knownId; |
||||
| 44 | } |
||||
| 45 | } |
||||
| 46 | |||||
| 47 | throw new UnknownChainException("Unknown chain '$chainName', did you mean: " . implode(", ", $alternatives)); |
||||
| 48 | } |
||||
| 49 | |||||
| 50 | return $this->rewDefinitions[$chainName]; |
||||
| 51 | } |
||||
| 52 | |||||
| 53 | public function getRewDefinitions(): array |
||||
| 54 | { |
||||
| 55 | return $this->rewDefinitions; |
||||
| 56 | } |
||||
| 57 | |||||
| 58 | public function getProcessor(string $chainName, array $options): ChainProcessor |
||||
| 59 | { |
||||
| 60 | $this->getRawDefinition($chainName); |
||||
| 61 | $definition = $this->definitions[$chainName]; |
||||
| 62 | $chain = $definition['chain']; |
||||
| 63 | $maxAsynchronousItems = $definition['maxAsynchronousItems'] ?? 20; |
||||
| 64 | $defaultOptions = $definition['defaultOptions'] ?? []; |
||||
| 65 | |||||
| 66 | $options = array_merge($defaultOptions, $options); |
||||
| 67 | |||||
| 68 | return $this->chainFactory->create($chain, $options, $maxAsynchronousItems); |
||||
| 69 | } |
||||
| 70 | |||||
| 71 | /** |
||||
| 72 | * Execute a particular chanin |
||||
| 73 | * |
||||
| 74 | * @param string $chainName |
||||
| 75 | * @param iterable $iterator |
||||
| 76 | * @param array $params |
||||
| 77 | * |
||||
| 78 | * @throws \Exception |
||||
| 79 | */ |
||||
| 80 | public function execute(string $chainName, iterable $iterator, array $params) |
||||
| 81 | { |
||||
| 82 | $definition = $this->getRawDefinition($chainName); |
||||
| 83 | |||||
| 84 | $inputData = ["Iterator! Can't show input data"]; |
||||
| 85 | if (is_array($iterator)) { |
||||
| 86 | $inputData = $iterator; |
||||
| 87 | $iterator = new \ArrayIterator($iterator); |
||||
| 88 | } |
||||
| 89 | |||||
| 90 | $execution = new EtlExecution($chainName, $definition, $inputData, $params); |
||||
| 91 | $execution->setStatus(EtlExecution::STATUS_RUNNING); |
||||
| 92 | $this->etlExecutionRepository->save($execution); |
||||
| 93 | |||||
| 94 | $this->executeFromEtlEntity($execution, $iterator); |
||||
| 95 | } |
||||
| 96 | |||||
| 97 | /** |
||||
| 98 | * Execute a chain from it's entity. |
||||
| 99 | * |
||||
| 100 | */ |
||||
| 101 | public function executeFromEtlEntity(EtlExecution $execution, iterable $iterator = null): void |
||||
| 102 | { |
||||
| 103 | $chainName = $execution->getName(); |
||||
| 104 | $logger = $this->loggerFactory->get($execution); |
||||
|
0 ignored issues
–
show
Unused Code
introduced
by
Loading history...
|
|||||
| 105 | |||||
| 106 | try { |
||||
| 107 | // Update execution object with new status. |
||||
| 108 | $execution->setStatus(EtlExecution::STATUS_RUNNING); |
||||
| 109 | $execution->setStartTime(new \DateTime()); |
||||
| 110 | $execution->setWaitTime(time() - $execution->getCreateTime()->getTimestamp()); |
||||
| 111 | $this->etlExecutionRepository->save($execution); |
||||
| 112 | |||||
| 113 | // Build the processor. |
||||
| 114 | $params = json_decode($execution->getInputOptions(), true); |
||||
|
0 ignored issues
–
show
It seems like
$execution->getInputOptions() can also be of type null; however, parameter $json of json_decode() does only seem to accept string, maybe add an additional type check?
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
Loading history...
|
|||||
| 115 | $processor = $this->getProcessor($chainName, $params); |
||||
| 116 | |||||
| 117 | if (is_null($iterator)) { |
||||
| 118 | $iterator = new \ArrayIterator(json_decode($execution->getInputData(), true)); |
||||
| 119 | } |
||||
| 120 | $params['etl'] = [ |
||||
| 121 | 'chain' => $chainName, |
||||
| 122 | 'startTime' => new \DateTime(), |
||||
| 123 | 'execution' => $execution |
||||
| 124 | ]; |
||||
| 125 | |||||
| 126 | // Start the process. |
||||
| 127 | $processor->process($iterator, $params); |
||||
| 128 | $execution = $this->etlExecutionRepository->find($execution->getId()); |
||||
| 129 | $execution->setStatus(EtlExecution::STATUS_SUCCESS); |
||||
| 130 | } catch (\Throwable $exception) { |
||||
| 131 | $execution = $this->etlExecutionRepository->find($execution->getId()); |
||||
| 132 | $execution->setFailTime(new \DateTime()); |
||||
| 133 | $execution->setStatus(EtlExecution::STATUS_FAILURE); |
||||
| 134 | $execution->setErrorMessage($this->getFullExceptionTrace($exception)); |
||||
| 135 | throw $exception; |
||||
| 136 | } finally { |
||||
| 137 | $execution->setEndTime(new \DateTime()); |
||||
| 138 | $execution->setRunTime(time() - $execution->getStartTime()->getTimestamp()); |
||||
| 139 | $execution->setStepStats('[]'); // To be developped |
||||
| 140 | $this->etlExecutionRepository->save($execution); |
||||
|
0 ignored issues
–
show
It seems like
$execution can also be of type null; however, parameter $execution of Oliverde8\PhpEtlBundle\R...utionRepository::save() does only seem to accept Oliverde8\PhpEtlBundle\Entity\EtlExecution, maybe add an additional type check?
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
Loading history...
|
|||||
| 141 | } |
||||
| 142 | } |
||||
| 143 | |||||
| 144 | protected function getFullExceptionTrace(\Throwable $exception): string |
||||
| 145 | { |
||||
| 146 | $message = ''; |
||||
| 147 | do { |
||||
| 148 | $message .= $exception->getMessage() . "\n" . $exception->getTraceAsString() . "\n\n"; |
||||
| 149 | } while ($exception = $exception->getPrevious()); |
||||
| 150 | |||||
| 151 | return $message; |
||||
| 152 | } |
||||
| 153 | } |
||||
| 154 |