 oliverde8    /
                    phpEtlBundle
                      oliverde8    /
                    phpEtlBundle
                
                            | 1 | <?php | ||||
| 2 | |||||
| 3 | namespace Oliverde8\PhpEtlBundle\Services; | ||||
| 4 | |||||
| 5 | use Oliverde8\Component\PhpEtl\ChainProcessor; | ||||
| 6 | use Oliverde8\PhpEtlBundle\Entity\EtlExecution; | ||||
| 7 | use Oliverde8\PhpEtlBundle\Exception\UnknownChainException; | ||||
| 8 | use Oliverde8\PhpEtlBundle\Factory\ChainFactory; | ||||
| 9 | use Oliverde8\PhpEtlBundle\Repository\EtlExecutionRepository; | ||||
| 10 | |||||
| 11 | class ChainProcessorsManager | ||||
| 12 | { | ||||
| 13 | protected EtlExecutionRepository $etlExecutionRepository; | ||||
| 14 | protected LoggerFactory $loggerFactory; | ||||
| 15 | protected ChainFactory $chainFactory; | ||||
| 16 | protected array $definitions; | ||||
| 17 | protected array $rewDefinitions; | ||||
| 18 | |||||
| 19 | public function __construct( | ||||
| 20 | EtlExecutionRepository $etlExecutionRepository, | ||||
| 21 | LoggerFactory $loggerFactory, | ||||
| 22 | ChainFactory $chainFactory, | ||||
| 23 | array $definitions, | ||||
| 24 | array $rawDefinitions | ||||
| 25 |     ) { | ||||
| 26 | $this->etlExecutionRepository = $etlExecutionRepository; | ||||
| 27 | $this->loggerFactory = $loggerFactory; | ||||
| 28 | $this->chainFactory = $chainFactory; | ||||
| 29 | $this->definitions = $definitions; | ||||
| 30 | $this->rewDefinitions = $rawDefinitions; | ||||
| 31 | } | ||||
| 32 | |||||
| 33 | /** | ||||
| 34 | * @throws UnknownChainException | ||||
| 35 | */ | ||||
| 36 | public function getRawDefinition(string $chainName): string | ||||
| 37 |     { | ||||
| 38 |         if (!isset($this->rewDefinitions[$chainName])) { | ||||
| 39 | $alternatives = []; | ||||
| 40 |             foreach (array_keys($this->rewDefinitions) as $knownId) { | ||||
| 41 | $lev = levenshtein($chainName, $knownId); | ||||
| 42 |                 if ($lev <= \strlen($chainName) / 3 || str_contains($knownId, $chainName)) { | ||||
| 43 | $alternatives[] = $knownId; | ||||
| 44 | } | ||||
| 45 | } | ||||
| 46 | |||||
| 47 |             throw new UnknownChainException("Unknown chain '$chainName', did you mean: " . implode(", ", $alternatives)); | ||||
| 48 | } | ||||
| 49 | |||||
| 50 | return $this->rewDefinitions[$chainName]; | ||||
| 51 | } | ||||
| 52 | |||||
| 53 | public function getRewDefinitions(): array | ||||
| 54 |     { | ||||
| 55 | return $this->rewDefinitions; | ||||
| 56 | } | ||||
| 57 | |||||
| 58 | public function getProcessor(string $chainName, array $options): ChainProcessor | ||||
| 59 |     { | ||||
| 60 | $this->getRawDefinition($chainName); | ||||
| 61 | $definition = $this->definitions[$chainName]; | ||||
| 62 | $chain = $definition['chain']; | ||||
| 63 | $maxAsynchronousItems = $definition['maxAsynchronousItems'] ?? 20; | ||||
| 64 | $defaultOptions = $definition['defaultOptions'] ?? []; | ||||
| 65 | |||||
| 66 | $options = array_merge($defaultOptions, $options); | ||||
| 67 | |||||
| 68 | return $this->chainFactory->create($chain, $options, $maxAsynchronousItems); | ||||
| 69 | } | ||||
| 70 | |||||
| 71 | /** | ||||
| 72 | * Execute a particular chanin | ||||
| 73 | * | ||||
| 74 | * @param string $chainName | ||||
| 75 | * @param iterable $iterator | ||||
| 76 | * @param array $params | ||||
| 77 | * | ||||
| 78 | * @throws \Exception | ||||
| 79 | */ | ||||
| 80 | public function execute(string $chainName, iterable $iterator, array $params) | ||||
| 81 |     { | ||||
| 82 | $definition = $this->getRawDefinition($chainName); | ||||
| 83 | |||||
| 84 | $inputData = ["Iterator! Can't show input data"]; | ||||
| 85 |         if (is_array($iterator)) { | ||||
| 86 | $inputData = $iterator; | ||||
| 87 | $iterator = new \ArrayIterator($iterator); | ||||
| 88 | } | ||||
| 89 | |||||
| 90 | $execution = new EtlExecution($chainName, $definition, $inputData, $params); | ||||
| 91 | $execution->setStatus(EtlExecution::STATUS_RUNNING); | ||||
| 92 | $this->etlExecutionRepository->save($execution); | ||||
| 93 | |||||
| 94 | $this->executeFromEtlEntity($execution, $iterator); | ||||
| 95 | } | ||||
| 96 | |||||
| 97 | /** | ||||
| 98 | * Execute a chain from it's entity. | ||||
| 99 | * | ||||
| 100 | */ | ||||
| 101 | public function executeFromEtlEntity(EtlExecution $execution, iterable $iterator = null): void | ||||
| 102 |     { | ||||
| 103 | $chainName = $execution->getName(); | ||||
| 104 | $logger = $this->loggerFactory->get($execution); | ||||
| 0 ignored issues–
                            show             Unused Code
    
    
    
        introduced 
                            by  
  Loading history... | |||||
| 105 | |||||
| 106 |         try { | ||||
| 107 | // Update execution object with new status. | ||||
| 108 | $execution->setStatus(EtlExecution::STATUS_RUNNING); | ||||
| 109 | $execution->setStartTime(new \DateTime()); | ||||
| 110 | $execution->setWaitTime(time() - $execution->getCreateTime()->getTimestamp()); | ||||
| 111 | $this->etlExecutionRepository->save($execution); | ||||
| 112 | |||||
| 113 | // Build the processor. | ||||
| 114 | $params = json_decode($execution->getInputOptions(), true); | ||||
| 0 ignored issues–
                            show It seems like  $execution->getInputOptions()can also be of typenull; however, parameter$jsonofjson_decode()does only seem to acceptstring, maybe add an additional type check?
                                                                                                                                                                                           (
                                     Ignorable by Annotation
                                ) If this is a false-positive, you can also ignore this issue in your code via the  
  Loading history... | |||||
| 115 | $processor = $this->getProcessor($chainName, $params); | ||||
| 116 | |||||
| 117 |             if (is_null($iterator)) { | ||||
| 118 | $iterator = new \ArrayIterator(json_decode($execution->getInputData(), true)); | ||||
| 119 | } | ||||
| 120 | $params['etl'] = [ | ||||
| 121 | 'chain' => $chainName, | ||||
| 122 | 'startTime' => new \DateTime(), | ||||
| 123 | 'execution' => $execution | ||||
| 124 | ]; | ||||
| 125 | |||||
| 126 | // Start the process. | ||||
| 127 | $processor->process($iterator, $params); | ||||
| 128 | $execution = $this->etlExecutionRepository->find($execution->getId()); | ||||
| 129 | $execution->setStatus(EtlExecution::STATUS_SUCCESS); | ||||
| 130 |         } catch (\Throwable $exception) { | ||||
| 131 | $execution = $this->etlExecutionRepository->find($execution->getId()); | ||||
| 132 | $execution->setFailTime(new \DateTime()); | ||||
| 133 | $execution->setStatus(EtlExecution::STATUS_FAILURE); | ||||
| 134 | $execution->setErrorMessage($this->getFullExceptionTrace($exception)); | ||||
| 135 | throw $exception; | ||||
| 136 |         } finally { | ||||
| 137 | $execution->setEndTime(new \DateTime()); | ||||
| 138 | $execution->setRunTime(time() - $execution->getStartTime()->getTimestamp()); | ||||
| 139 |             $execution->setStepStats('[]'); // To be developped | ||||
| 140 | $this->etlExecutionRepository->save($execution); | ||||
| 0 ignored issues–
                            show It seems like  $executioncan also be of typenull; however, parameter$executionofOliverde8\PhpEtlBundle\R...utionRepository::save()does only seem to acceptOliverde8\PhpEtlBundle\Entity\EtlExecution, maybe add an additional type check?
                                                                                                                                                                                           (
                                     Ignorable by Annotation
                                ) If this is a false-positive, you can also ignore this issue in your code via the  
  Loading history... | |||||
| 141 | } | ||||
| 142 | } | ||||
| 143 | |||||
| 144 | protected function getFullExceptionTrace(\Throwable $exception): string | ||||
| 145 |     { | ||||
| 146 | $message = ''; | ||||
| 147 |         do { | ||||
| 148 | $message .= $exception->getMessage() . "\n" . $exception->getTraceAsString() . "\n\n"; | ||||
| 149 | } while ($exception = $exception->getPrevious()); | ||||
| 150 | |||||
| 151 | return $message; | ||||
| 152 | } | ||||
| 153 | } | ||||
| 154 | 
