Completed
Push — master ( 4317fb...b1ebb8 )
by Matthew
04:36 queued 02:27
created

WorkerManager::run()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 9
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 6

Importance

Changes 0
Metric Value
dl 0
loc 9
ccs 0
cts 5
cp 0
rs 9.6666
c 0
b 0
f 0
cc 2
eloc 5
nc 2
nop 4
crap 6
1
<?php
2
3
namespace Dtc\QueueBundle\Model;
4
5
use Dtc\QueueBundle\EventDispatcher\Event;
6
use Dtc\QueueBundle\EventDispatcher\EventDispatcher;
7
use Dtc\QueueBundle\Exception\DuplicateWorkerException;
8
use Psr\Log\LoggerInterface;
9
10
class WorkerManager
11
{
12
    protected $workers;
13
    protected $jobManager;
14
15
    /** @var LoggerInterface */
16
    protected $logger;
17
    protected $eventDispatcher;
18
    protected $logFunc;
19
20 2
    public function __construct(JobManagerInterface $jobManager, EventDispatcher $eventDispatcher)
21
    {
22 2
        $this->workers = array();
23 2
        $this->jobManager = $jobManager;
24 2
        $this->eventDispatcher = $eventDispatcher;
25 2
    }
26
27
    public function setLogger(LoggerInterface $logger)
28
    {
29
        $this->logger = $logger;
30
    }
31
32
    /**
33
     * @param Worker $worker
34
     *
35
     * @throws DuplicateWorkerException
36
     */
37 2
    public function addWorker(Worker $worker)
38
    {
39 2
        if ($this->logger) {
40
            $this->logger->debug(__METHOD__." - Added worker: {$worker->getName()}");
41
        }
42
43 2
        if (isset($this->workers[$worker->getName()])) {
44
            throw new DuplicateWorkerException("{$worker->getName()} already exists in worker manager");
45
        }
46
47 2
        $this->workers[$worker->getName()] = $worker;
48 2
    }
49
50 1
    public function getWorker($name)
51
    {
52 1
        if (isset($this->workers[$name])) {
53 1
            return $this->workers[$name];
54
        }
55
56 1
        return null;
57
    }
58
59
    public function getWorkers()
60
    {
61
        return $this->workers;
62
    }
63
64
    public function setLoggingFunc(callable $callable)
65
    {
66
        $this->logFunc = $callable;
67
    }
68
69
    public function log($level, $msg, array $context = [])
70
    {
71
        if ($this->logFunc) {
72
            call_user_func_array($this->logFunc, [$level, $msg, $context]);
73
74
            return;
75
        }
76
77
        if ($this->logger) {
78
            $this->logger->$level($msg, $context);
79
80
            return;
81
        }
82
    }
83
84
    /**
85
     * @param null $workerName
86
     * @param null $methodName
87
     * @param bool $prioritize
88
     *
89
     * @return null|Job
90
     */
91
    public function run($workerName = null, $methodName = null, $prioritize = true, $runId = null)
92
    {
93
        $job = $this->jobManager->getJob($workerName, $methodName, $prioritize, $runId);
94
        if (!$job) {
95
            return null; // no job to run
96
        }
97
98
        return $this->runJob($job);
99
    }
100
101
    /**
102
     * @param array $payload
103
     * @param Job   $job
104
     */
105
    protected function handleException(array $payload, Job $job)
106
    {
107
        $exception = $payload[0];
108
        $exceptionMessage = get_class($exception)."\n".$exception->getCode().' - '.$exception->getMessage()."\n".$exception->getTraceAsString();
109
        $this->log('debug', "Failed: {$job->getClassName()}->{$job->getMethod()}");
110
        $job->setStatus(BaseJob::STATUS_ERROR);
111
        if ($job instanceof RetryableJob) {
112
            $job->setErrorCount($job->getErrorCount() + 1);
113
            if (null !== ($maxError = $job->getMaxError()) && $job->getErrorCount() >= $maxError) {
114
                $job->setStatus(RetryableJob::STATUS_MAX_ERROR);
115
            }
116
        }
117
        $job->setMessage($exceptionMessage);
118
        $this->jobManager->getJobTimingManager()->recordTiming(JobTiming::STATUS_FINISHED_ERROR);
119
    }
120
121
    public function runJob(Job $job)
122
    {
123
        $event = new Event($job);
124
        $this->eventDispatcher->dispatch(Event::PRE_JOB, $event);
125
126
        $start = microtime(true);
127
        try {
128
            $worker = $this->getWorker($job->getWorkerName());
129
            $this->log('debug', "Start: {$job->getClassName()}->{$job->getMethod()}", $job->getArgs());
130
            $job->setStartedAt(new \DateTime());
131
            call_user_func_array(array($worker, $job->getMethod()), $job->getArgs());
132
133
            // Job finshed successfuly... do we remove the job from database?
134
            $job->setStatus(BaseJob::STATUS_SUCCESS);
135
136
            $job->setMessage(null);
137
            $this->jobManager->getJobTimingManager()->recordTiming(JobTiming::STATUS_FINISHED_SUCCESS);
138
        } catch (\Throwable $exception) {
139
            $this->handleException([$exception], $job);
140
        } catch (\Exception $exception) {
141
            $this->handleException([$exception], $job);
142
        }
143
144
        // save Job history
145
        $elapsed = microtime(true) - $start;
146
        $job->setFinishedAt(new \DateTime());
147
        $job->setElapsed($elapsed);
148
149
        $this->log('debug', "Finished: {$job->getClassName()}->{$job->getMethod()} in {$elapsed} seconds");
150
        $this->log('debug', "Save job history: {$job->getId()}");
151
152
        $this->jobManager->saveHistory($job);
153
        $this->eventDispatcher->dispatch(Event::POST_JOB, $event);
154
155
        return $job;
156
    }
157
}
158