Completed
Pull Request — master (#30)
by Matthew
23:29 queued 08:10
created

WorkerManager::handleException()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 16
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 11
CRAP Score 2.0023

Importance

Changes 0
Metric Value
dl 0
loc 16
ccs 11
cts 12
cp 0.9167
rs 9.4285
c 0
b 0
f 0
cc 2
eloc 12
nc 2
nop 2
crap 2.0023
1
<?php
2
3
namespace Dtc\QueueBundle\Manager;
4
5
use Dtc\QueueBundle\EventDispatcher\Event;
6
use Dtc\QueueBundle\EventDispatcher\EventDispatcher;
7
use Dtc\QueueBundle\Exception\DuplicateWorkerException;
8
use Dtc\QueueBundle\Model\BaseJob;
9
use Dtc\QueueBundle\Model\Job;
10
use Dtc\QueueBundle\Model\JobTiming;
11
use Dtc\QueueBundle\Model\Worker;
12
use Psr\Log\LoggerInterface;
13
14
class WorkerManager
15
{
16
    protected $workers;
17
    protected $jobManager;
18
19
    /** @var LoggerInterface */
20
    protected $logger;
21
    protected $eventDispatcher;
22
    protected $logFunc;
23
24 18
    public function __construct(JobManagerInterface $jobManager, EventDispatcher $eventDispatcher)
25
    {
26 18
        $this->workers = array();
27 18
        $this->jobManager = $jobManager;
28 18
        $this->eventDispatcher = $eventDispatcher;
29 18
    }
30
31
    public function setLogger(LoggerInterface $logger)
32
    {
33
        $this->logger = $logger;
34
    }
35
36
    /**
37
     * @param Worker $worker
38
     *
39
     * @throws DuplicateWorkerException
40
     */
41 9
    public function addWorker(Worker $worker)
42
    {
43 9
        if ($this->logger) {
44
            $this->logger->debug(__METHOD__." - Added worker: {$worker->getName()}");
45
        }
46
47 9
        if (isset($this->workers[$worker->getName()])) {
48 1
            throw new DuplicateWorkerException("{$worker->getName()} already exists in worker manager");
49
        }
50
51 9
        $this->workers[$worker->getName()] = $worker;
52 9
    }
53
54 8
    public function getWorker($name)
55
    {
56 8
        if (isset($this->workers[$name])) {
57 8
            return $this->workers[$name];
58
        }
59
60 1
        return null;
61
    }
62
63 1
    public function getWorkers()
64
    {
65 1
        return $this->workers;
66
    }
67
68 3
    public function setLoggingFunc(callable $callable)
69
    {
70 3
        $this->logFunc = $callable;
71 3
    }
72
73 6
    public function log($level, $msg, array $context = [])
74
    {
75 6
        if ($this->logFunc) {
76 3
            call_user_func_array($this->logFunc, [$level, $msg, $context]);
77
78 3
            return;
79
        }
80
81 4
        if ($this->logger) {
82
            $this->logger->$level($msg, $context);
83
84
            return;
85
        }
86 4
    }
87
88
    /**
89
     * @param null $workerName
90
     * @param null $methodName
91
     * @param bool $prioritize
92
     *
93
     * @return null|Job
94
     */
95 5
    public function run($workerName = null, $methodName = null, $prioritize = true, $runId = null)
96
    {
97 5
        $job = $this->jobManager->getJob($workerName, $methodName, $prioritize, $runId);
98 5
        if (!$job) {
99 3
            return null; // no job to run
100
        }
101
102 5
        return $this->runJob($job);
103
    }
104
105
    /**
106
     * @param array $payload
107
     * @param Job   $job
108
     */
109 1
    protected function handleException(array $payload, Job $job)
110
    {
111 1
        $exception = $payload[0];
112 1
        $exceptionMessage = get_class($exception)."\n".$exception->getCode().' - '.$exception->getMessage()."\n".$exception->getTraceAsString();
113 1
        $this->log('debug', "Failed: {$job->getClassName()}->{$job->getMethod()}");
114 1
        $job->setStatus(BaseJob::STATUS_EXCEPTION);
115 1
        $message = $job->getMessage();
116 1
        if (null !== $message) {
117
            $message .= "\n\n";
118
        } else {
119 1
            $message = $exceptionMessage;
120
        }
121
122 1
        $job->setMessage($message);
123 1
        $this->jobManager->getJobTimingManager()->recordTiming(JobTiming::STATUS_FINISHED_EXCEPTION);
124 1
    }
125
126 5
    public function processStatus(Job $job, $result)
127
    {
128 5
        if (Worker::RESULT_FAILURE === $result) {
129 3
            $job->setStatus(BaseJob::STATUS_FAILURE);
130 3
            $this->jobManager->getJobTimingManager()->recordTiming(JobTiming::STATUS_FINISHED_FAILURE);
131
132 3
            return;
133
        }
134 2
        $job->setStatus(BaseJob::STATUS_SUCCESS);
135 2
        $this->jobManager->getJobTimingManager()->recordTiming(JobTiming::STATUS_FINISHED_SUCCESS);
136 2
    }
137
138 6
    public function runJob(Job $job)
139
    {
140 6
        $event = new Event($job);
141 6
        $this->eventDispatcher->dispatch(Event::PRE_JOB, $event);
142
143 6
        $start = microtime(true);
144
        try {
145
            /** @var Worker $worker */
146 6
            $worker = $this->getWorker($job->getWorkerName());
147 6
            $this->log('debug', "Start: {$job->getClassName()}->{$job->getMethod()}", $job->getArgs());
148 6
            $job->setStartedAt(new \DateTime());
149 6
            $job->setMessage(null);
150 6
            $worker->setCurrentJob($job);
151 6
            $result = call_user_func_array(array($worker, $job->getMethod()), $job->getArgs());
152 5
            $this->processStatus($job, $result);
153 1
        } catch (\Throwable $exception) {
154 1
            $this->handleException([$exception], $job);
155
        } catch (\Exception $exception) {
156
            $this->handleException([$exception], $job);
157
        }
158
159
        // save Job history
160 6
        $elapsed = microtime(true) - $start;
161 6
        $job->setFinishedAt(new \DateTime());
162 6
        $job->setElapsed($elapsed);
163
164 6
        $this->log('debug', "Finished: {$job->getClassName()}->{$job->getMethod()} in {$elapsed} seconds");
165 6
        $this->log('debug', "Save job history: {$job->getId()}");
166
167 6
        $this->jobManager->saveHistory($job);
168 6
        $this->eventDispatcher->dispatch(Event::POST_JOB, $event);
169
170 6
        return $job;
171
    }
172
}
173