mmucklo /
DtcQueueBundle
| 1 | <?php |
||
| 2 | |||
| 3 | namespace Dtc\QueueBundle\Manager; |
||
| 4 | |||
| 5 | use Dtc\QueueBundle\EventDispatcher\Event; |
||
| 6 | use Dtc\QueueBundle\EventDispatcher\EventDispatcher; |
||
| 7 | use Dtc\QueueBundle\Exception\DuplicateWorkerException; |
||
| 8 | use Dtc\QueueBundle\Model\BaseJob; |
||
| 9 | use Dtc\QueueBundle\Model\Job; |
||
| 10 | use Dtc\QueueBundle\Model\JobTiming; |
||
| 11 | use Dtc\QueueBundle\Model\Worker; |
||
| 12 | use Dtc\QueueBundle\Util\Util; |
||
| 13 | use Psr\Log\LoggerInterface; |
||
|
0 ignored issues
–
show
|
|||
| 14 | |||
| 15 | class WorkerManager |
||
| 16 | { |
||
| 17 | protected $workers; |
||
| 18 | protected $jobManager; |
||
| 19 | |||
| 20 | /** @var LoggerInterface */ |
||
| 21 | protected $logger; |
||
| 22 | protected $eventDispatcher; |
||
| 23 | protected $logFunc; |
||
| 24 | |||
| 25 | 18 | public function __construct(JobManagerInterface $jobManager, EventDispatcher $eventDispatcher) |
|
| 26 | { |
||
| 27 | 18 | $this->workers = []; |
|
| 28 | 18 | $this->jobManager = $jobManager; |
|
| 29 | 18 | $this->eventDispatcher = $eventDispatcher; |
|
| 30 | 18 | } |
|
| 31 | |||
| 32 | public function setLogger(LoggerInterface $logger) |
||
| 33 | { |
||
| 34 | $this->logger = $logger; |
||
| 35 | } |
||
| 36 | |||
| 37 | /** |
||
| 38 | * @throws DuplicateWorkerException |
||
| 39 | */ |
||
| 40 | 9 | public function addWorker(Worker $worker) |
|
| 41 | { |
||
| 42 | 9 | if ($this->logger) { |
|
| 43 | $this->logger->debug(__METHOD__." - Added worker: {$worker->getName()}"); |
||
| 44 | } |
||
| 45 | |||
| 46 | 9 | if (isset($this->workers[$worker->getName()])) { |
|
| 47 | 1 | throw new DuplicateWorkerException("{$worker->getName()} already exists in worker manager"); |
|
| 48 | } |
||
| 49 | |||
| 50 | 9 | $this->workers[$worker->getName()] = $worker; |
|
| 51 | 9 | } |
|
| 52 | |||
| 53 | 8 | public function getWorker($name) |
|
| 54 | { |
||
| 55 | 8 | if (isset($this->workers[$name])) { |
|
| 56 | 8 | return $this->workers[$name]; |
|
| 57 | } |
||
| 58 | |||
| 59 | return null; |
||
| 60 | } |
||
| 61 | |||
| 62 | 1 | public function getWorkers() |
|
| 63 | { |
||
| 64 | 1 | return $this->workers; |
|
| 65 | } |
||
| 66 | |||
| 67 | 3 | public function setLoggingFunc(callable $callable) |
|
| 68 | { |
||
| 69 | 3 | $this->logFunc = $callable; |
|
| 70 | 3 | } |
|
| 71 | |||
| 72 | 6 | public function log($level, $msg, array $context = []) |
|
| 73 | { |
||
| 74 | 6 | if ($this->logFunc) { |
|
| 75 | 3 | call_user_func_array($this->logFunc, [$level, $msg, $context]); |
|
| 76 | |||
| 77 | 3 | return; |
|
| 78 | } |
||
| 79 | |||
| 80 | 4 | if ($this->logger) { |
|
| 81 | $this->logger->$level($msg, $context); |
||
| 82 | |||
| 83 | return; |
||
| 84 | } |
||
| 85 | 4 | } |
|
| 86 | |||
| 87 | /** |
||
| 88 | * @param null $workerName |
||
|
0 ignored issues
–
show
|
|||
| 89 | * @param null $methodName |
||
|
0 ignored issues
–
show
|
|||
| 90 | * @param bool $prioritize |
||
| 91 | * |
||
| 92 | * @return Job|null |
||
| 93 | */ |
||
| 94 | 5 | public function run($workerName = null, $methodName = null, $prioritize = true, $runId = null) |
|
| 95 | { |
||
| 96 | 5 | $job = $this->jobManager->getJob($workerName, $methodName, $prioritize, $runId); |
|
| 97 | 5 | if (!$job) { |
|
| 98 | 3 | return null; // no job to run |
|
| 99 | } |
||
| 100 | |||
| 101 | 5 | return $this->runJob($job); |
|
| 102 | } |
||
| 103 | |||
| 104 | 1 | protected function handleException(array $payload, Job $job) |
|
| 105 | { |
||
| 106 | 1 | $exception = $payload[0]; |
|
| 107 | 1 | $exceptionMessage = get_class($exception)."\n".$exception->getCode().' - '.$exception->getMessage()."\n".$exception->getTraceAsString(); |
|
| 108 | 1 | $this->log('debug', "Failed: {$job->getClassName()}->{$job->getMethod()}"); |
|
| 109 | 1 | $job->setStatus(BaseJob::STATUS_EXCEPTION); |
|
| 110 | 1 | $message = $job->getMessage(); |
|
| 111 | 1 | if (null !== $message) { |
|
| 112 | $message .= "\n\n"; |
||
| 113 | } else { |
||
| 114 | 1 | $message = $exceptionMessage; |
|
| 115 | } |
||
| 116 | |||
| 117 | 1 | $job->setMessage($message); |
|
| 118 | 1 | $this->jobManager->getJobTimingManager()->recordTiming(JobTiming::STATUS_FINISHED_EXCEPTION); |
|
| 119 | 1 | } |
|
| 120 | |||
| 121 | 5 | public function processStatus(Job $job, $result) |
|
| 122 | { |
||
| 123 | 5 | if (Worker::RESULT_FAILURE === $result) { |
|
| 124 | 3 | $job->setStatus(BaseJob::STATUS_FAILURE); |
|
| 125 | 3 | $this->jobManager->getJobTimingManager()->recordTiming(JobTiming::STATUS_FINISHED_FAILURE); |
|
| 126 | |||
| 127 | 3 | return; |
|
| 128 | } |
||
| 129 | 2 | $job->setStatus(BaseJob::STATUS_SUCCESS); |
|
| 130 | 2 | $this->jobManager->getJobTimingManager()->recordTiming(JobTiming::STATUS_FINISHED_SUCCESS); |
|
| 131 | 2 | } |
|
| 132 | |||
| 133 | 6 | public function runJob(Job $job) |
|
| 134 | { |
||
| 135 | 6 | $event = new Event($job); |
|
| 136 | 6 | $this->eventDispatcher->dispatch(Event::PRE_JOB, $event); |
|
| 137 | |||
| 138 | 6 | $start = microtime(true); |
|
| 139 | try { |
||
| 140 | /** @var Worker $worker */ |
||
| 141 | 6 | $worker = $this->getWorker($job->getWorkerName()); |
|
| 142 | 6 | $this->log('debug', "Start: {$job->getClassName()}->{$job->getMethod()}", $job->getArgs()); |
|
| 143 | 6 | $job->setStartedAt(Util::getMicrotimeFloatDateTime($start)); |
|
| 144 | 6 | $job->setMessage(null); |
|
| 145 | 6 | $worker->setCurrentJob($job); |
|
| 146 | 6 | $result = call_user_func_array([$worker, $job->getMethod()], $job->getArgs()); |
|
| 147 | 5 | $this->processStatus($job, $result); |
|
| 148 | 1 | } catch (\Throwable $exception) { |
|
| 149 | 1 | $this->handleException([$exception], $job); |
|
| 150 | } catch (\Exception $exception) { |
||
| 151 | $this->handleException([$exception], $job); |
||
| 152 | } |
||
| 153 | |||
| 154 | // save Job history |
||
| 155 | 6 | $elapsed = microtime(true) - $start; |
|
| 156 | 6 | $job->setFinishedAt(Util::getMicrotimeDateTime()); |
|
| 157 | 6 | $job->setElapsed($elapsed); |
|
| 158 | |||
| 159 | 6 | $this->log('debug', "Finished: {$job->getClassName()}->{$job->getMethod()} in {$elapsed} seconds"); |
|
| 160 | 6 | $this->log('debug', "Save job history: {$job->getId()}"); |
|
| 161 | |||
| 162 | 6 | $this->jobManager->saveHistory($job); |
|
| 163 | 6 | $this->eventDispatcher->dispatch(Event::POST_JOB, $event); |
|
| 164 | |||
| 165 | 6 | return $job; |
|
| 166 | } |
||
| 167 | } |
||
| 168 |
The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g.
excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths