1 | <?php |
||
2 | |||
3 | namespace Dtc\QueueBundle\Manager; |
||
4 | |||
5 | use Dtc\QueueBundle\EventDispatcher\Event; |
||
6 | use Dtc\QueueBundle\EventDispatcher\EventDispatcher; |
||
7 | use Dtc\QueueBundle\Exception\DuplicateWorkerException; |
||
8 | use Dtc\QueueBundle\Model\BaseJob; |
||
9 | use Dtc\QueueBundle\Model\Job; |
||
10 | use Dtc\QueueBundle\Model\JobTiming; |
||
11 | use Dtc\QueueBundle\Model\Worker; |
||
12 | use Dtc\QueueBundle\Util\Util; |
||
13 | use Psr\Log\LoggerInterface; |
||
14 | |||
15 | class WorkerManager |
||
16 | { |
||
17 | protected $workers; |
||
18 | protected $jobManager; |
||
19 | |||
20 | /** @var LoggerInterface */ |
||
21 | protected $logger; |
||
22 | protected $eventDispatcher; |
||
23 | protected $logFunc; |
||
24 | |||
25 | 18 | public function __construct(JobManagerInterface $jobManager, EventDispatcher $eventDispatcher) |
|
26 | { |
||
27 | 18 | $this->workers = array(); |
|
28 | 18 | $this->jobManager = $jobManager; |
|
29 | 18 | $this->eventDispatcher = $eventDispatcher; |
|
30 | 18 | } |
|
31 | |||
32 | public function setLogger(LoggerInterface $logger) |
||
33 | { |
||
34 | $this->logger = $logger; |
||
35 | } |
||
36 | |||
37 | /** |
||
38 | * @param Worker $worker |
||
39 | * |
||
40 | * @throws DuplicateWorkerException |
||
41 | */ |
||
42 | 9 | public function addWorker(Worker $worker) |
|
43 | { |
||
44 | 9 | if ($this->logger) { |
|
45 | $this->logger->debug(__METHOD__." - Added worker: {$worker->getName()}"); |
||
46 | } |
||
47 | |||
48 | 9 | if (isset($this->workers[$worker->getName()])) { |
|
49 | 1 | throw new DuplicateWorkerException("{$worker->getName()} already exists in worker manager"); |
|
50 | } |
||
51 | |||
52 | 9 | $this->workers[$worker->getName()] = $worker; |
|
53 | 9 | } |
|
54 | |||
55 | 8 | public function getWorker($name) |
|
56 | { |
||
57 | 8 | if (isset($this->workers[$name])) { |
|
58 | 8 | return $this->workers[$name]; |
|
59 | } |
||
60 | |||
61 | return null; |
||
62 | } |
||
63 | |||
64 | 1 | public function getWorkers() |
|
65 | { |
||
66 | 1 | return $this->workers; |
|
67 | } |
||
68 | |||
69 | 3 | public function setLoggingFunc(callable $callable) |
|
70 | { |
||
71 | 3 | $this->logFunc = $callable; |
|
72 | 3 | } |
|
73 | |||
74 | 6 | public function log($level, $msg, array $context = []) |
|
75 | { |
||
76 | 6 | if ($this->logFunc) { |
|
77 | 3 | call_user_func_array($this->logFunc, [$level, $msg, $context]); |
|
78 | |||
79 | 3 | return; |
|
80 | } |
||
81 | |||
82 | 4 | if ($this->logger) { |
|
83 | $this->logger->$level($msg, $context); |
||
84 | |||
85 | return; |
||
86 | } |
||
87 | 4 | } |
|
88 | |||
89 | /** |
||
90 | * @param null $workerName |
||
91 | * @param null $methodName |
||
92 | * @param bool $prioritize |
||
93 | * |
||
94 | * @return Job|null |
||
95 | */ |
||
96 | 5 | public function run($workerName = null, $methodName = null, $prioritize = true, $runId = null) |
|
97 | { |
||
98 | 5 | $job = $this->jobManager->getJob($workerName, $methodName, $prioritize, $runId); |
|
99 | 5 | if (!$job) { |
|
100 | 3 | return null; // no job to run |
|
101 | } |
||
102 | |||
103 | 5 | return $this->runJob($job); |
|
104 | } |
||
105 | |||
106 | /** |
||
107 | * @param array $payload |
||
108 | * @param Job $job |
||
109 | */ |
||
110 | 1 | protected function handleException(array $payload, Job $job) |
|
111 | { |
||
112 | 1 | $exception = $payload[0]; |
|
113 | 1 | $exceptionMessage = get_class($exception)."\n".$exception->getCode().' - '.$exception->getMessage()."\n".$exception->getTraceAsString(); |
|
114 | 1 | $this->log('debug', "Failed: {$job->getClassName()}->{$job->getMethod()}"); |
|
115 | 1 | $job->setStatus(BaseJob::STATUS_EXCEPTION); |
|
116 | 1 | $message = $job->getMessage(); |
|
117 | 1 | if (null !== $message) { |
|
118 | $message .= "\n\n"; |
||
119 | } else { |
||
120 | 1 | $message = $exceptionMessage; |
|
121 | } |
||
122 | |||
123 | 1 | $job->setMessage($message); |
|
124 | 1 | $this->jobManager->getJobTimingManager()->recordTiming(JobTiming::STATUS_FINISHED_EXCEPTION); |
|
125 | 1 | } |
|
126 | |||
127 | 5 | public function processStatus(Job $job, $result) |
|
128 | { |
||
129 | 5 | if (Worker::RESULT_FAILURE === $result) { |
|
130 | 3 | $job->setStatus(BaseJob::STATUS_FAILURE); |
|
131 | 3 | $this->jobManager->getJobTimingManager()->recordTiming(JobTiming::STATUS_FINISHED_FAILURE); |
|
132 | |||
133 | 3 | return; |
|
134 | } |
||
135 | 2 | $job->setStatus(BaseJob::STATUS_SUCCESS); |
|
136 | 2 | $this->jobManager->getJobTimingManager()->recordTiming(JobTiming::STATUS_FINISHED_SUCCESS); |
|
137 | 2 | } |
|
138 | |||
139 | 6 | public function runJob(Job $job) |
|
140 | { |
||
141 | 6 | $event = new Event($job); |
|
142 | 6 | $this->eventDispatcher->dispatch(Event::PRE_JOB, $event); |
|
143 | |||
144 | 6 | $start = microtime(true); |
|
145 | try { |
||
146 | /** @var Worker $worker */ |
||
147 | 6 | $worker = $this->getWorker($job->getWorkerName()); |
|
148 | 6 | $this->log('debug', "Start: {$job->getClassName()}->{$job->getMethod()}", $job->getArgs()); |
|
149 | 6 | $job->setStartedAt(Util::getMicrotimeFloatDateTime($start)); |
|
150 | 6 | $job->setMessage(null); |
|
151 | 6 | $worker->setCurrentJob($job); |
|
152 | 6 | $result = call_user_func_array(array($worker, $job->getMethod()), $job->getArgs()); |
|
153 | 5 | $this->processStatus($job, $result); |
|
154 | 1 | } catch (\Throwable $exception) { |
|
155 | 1 | $this->handleException([$exception], $job); |
|
156 | } catch (\Exception $exception) { |
||
157 | $this->handleException([$exception], $job); |
||
158 | } |
||
159 | |||
160 | // save Job history |
||
161 | 6 | $elapsed = microtime(true) - $start; |
|
162 | 6 | $job->setFinishedAt(Util::getMicrotimeDateTime()); |
|
163 | 6 | $job->setElapsed($elapsed); |
|
0 ignored issues
–
show
Bug
introduced
by
Loading history...
|
|||
164 | |||
165 | 6 | $this->log('debug', "Finished: {$job->getClassName()}->{$job->getMethod()} in {$elapsed} seconds"); |
|
166 | 6 | $this->log('debug', "Save job history: {$job->getId()}"); |
|
167 | |||
168 | 6 | $this->jobManager->saveHistory($job); |
|
169 | 6 | $this->eventDispatcher->dispatch(Event::POST_JOB, $event); |
|
170 | |||
171 | 6 | return $job; |
|
172 | } |
||
173 | } |
||
174 |