Completed
Branch master (294ea0)
by Matthew
18:22
created

WorkerManager::handleException()   A

Complexity

Conditions 4
Paths 3

Size

Total Lines 14
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 4.432

Importance

Changes 0
Metric Value
dl 0
loc 14
ccs 7
cts 10
cp 0.7
rs 9.2
c 0
b 0
f 0
cc 4
eloc 10
nc 3
nop 2
crap 4.432
1
<?php
2
3
namespace Dtc\QueueBundle\Model;
4
5
use Dtc\QueueBundle\EventDispatcher\Event;
6
use Dtc\QueueBundle\EventDispatcher\EventDispatcher;
7
use Dtc\QueueBundle\Exception\DuplicateWorkerException;
8
use Psr\Log\LoggerInterface;
9
10
class WorkerManager
11
{
12
    protected $workers;
13
    protected $jobManager;
14
15
    /** @var LoggerInterface */
16
    protected $logger;
17
    protected $eventDispatcher;
18
    protected $logFunc;
19
20 9
    public function __construct(JobManagerInterface $jobManager, EventDispatcher $eventDispatcher)
21
    {
22 9
        $this->workers = array();
23 9
        $this->jobManager = $jobManager;
24 9
        $this->eventDispatcher = $eventDispatcher;
25 9
    }
26
27
    public function setLogger(LoggerInterface $logger)
28
    {
29
        $this->logger = $logger;
30
    }
31
32 9
    public function addWorker(Worker $worker)
33
    {
34 9
        if ($this->logger) {
35
            $this->logger->debug(__METHOD__." - Added worker: {$worker->getName()}");
36
        }
37
38 9
        if (isset($this->workers[$worker->getName()])) {
39 1
            throw new DuplicateWorkerException("{$worker->getName()} already exists in worker manager");
40
        }
41
42 9
        $this->workers[$worker->getName()] = $worker;
43 9
    }
44
45 8
    public function getWorker($name)
46
    {
47 8
        if (isset($this->workers[$name])) {
48 8
            return $this->workers[$name];
49
        }
50
51 1
        return null;
52
    }
53
54
    public function getWorkers()
55
    {
56
        return $this->workers;
57
    }
58
59 3
    public function setLoggingFunc(callable $callable)
60
    {
61 3
        $this->logFunc = $callable;
62 3
    }
63
64 6
    public function log($level, $msg, array $context = [])
65
    {
66 6
        if ($this->logFunc) {
67 3
            call_user_func_array($this->logFunc, [$level, $msg, $context]);
68
69 3
            return;
70
        }
71
72 4
        if ($this->logger) {
73
            $this->logger->$level($msg, $context);
74
75
            return;
76
        }
77 4
    }
78
79
    /**
80
     * @param null $workerName
81
     * @param null $methodName
82
     * @param bool $prioritize
83
     *
84
     * @return null|Job
85
     */
86 5
    public function run($workerName = null, $methodName = null, $prioritize = true, $runId = null)
87
    {
88 5
        $job = $this->jobManager->getJob($workerName, $methodName, $prioritize, $runId);
89 5
        if (!$job) {
90 3
            return null; // no job to run
91
        }
92
93 5
        return $this->runJob($job);
94
    }
95
96 6
    /**
97
     * @param array $payload
98 6
     * @param Job   $job
99 6
     */
100
    protected function handleException(array $payload, Job $job)
101 6
    {
102 6
        $exception = $payload[0];
103
        $exceptionMessage = get_class($exception)."\n".$exception->getCode().' - '.$exception->getMessage()."\n".$exception->getTraceAsString();
104 1
        $this->log('debug', "Failed: {$job->getClassName()}->{$job->getMethod()}");
105 1
        $job->setStatus(BaseJob::STATUS_ERROR);
106 1
        if ($job instanceof RetryableJob) {
107 1
            $job->setErrorCount($job->getErrorCount() + 1);
108
            if (null !== ($maxError = $job->getMaxError()) && $job->getErrorCount() >= $maxError) {
109
                $job->setStatus(RetryableJob::STATUS_MAX_ERROR);
110
            }
111
        }
112
        $job->setMessage($exceptionMessage);
113 1
    }
114 6
115
    public function runJob(Job $job)
116
    {
117
        $event = new Event($job);
118
        $this->eventDispatcher->dispatch(Event::PRE_JOB, $event);
119
120
        $start = microtime(true);
121
        try {
122
            $worker = $this->getWorker($job->getWorkerName());
123
            $this->log('debug', "Start: {$job->getClassName()}->{$job->getMethod()}", $job->getArgs());
124
            $job->setStartedAt(new \DateTime());
125
            call_user_func_array(array($worker, $job->getMethod()), $job->getArgs());
126
127
            // Job finshed successfuly... do we remove the job from database?
128
            $job->setStatus(BaseJob::STATUS_SUCCESS);
129
            $job->setMessage(null);
130
        } catch (\Throwable $exception) {
131
            $this->handleException([$exception], $job);
132
        } catch (\Exception $exception) {
133
            $this->handleException([$exception], $job);
134
        }
135
136
        // save Job history
137
        $elapsed = microtime(true) - $start;
138
        $job->setFinishedAt(new \DateTime());
139
        $job->setElapsed($elapsed);
140
141
        $this->log('debug', "Finished: {$job->getClassName()}->{$job->getMethod()} in {$elapsed} seconds");
142
        $this->log('debug', "Save job history: {$job->getId()}");
143
144
        $this->jobManager->saveHistory($job);
145
        $this->eventDispatcher->dispatch(Event::POST_JOB, $event);
146
147
        return $job;
148
    }
149
}
150