Completed
Push — master ( c702e0...af42cc )
by Matthew
07:24
created

WorkerManager::runJob()   B

Complexity

Conditions 6
Paths 13

Size

Total Lines 47
Code Lines 32

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 28
CRAP Score 6.1979

Importance

Changes 0
Metric Value
dl 0
loc 47
ccs 28
cts 34
cp 0.8235
rs 8.5125
c 0
b 0
f 0
cc 6
eloc 32
nc 13
nop 1
crap 6.1979
1
<?php
2
3
namespace Dtc\QueueBundle\Model;
4
5
use Dtc\QueueBundle\EventDispatcher\Event;
6
use Dtc\QueueBundle\EventDispatcher\EventDispatcher;
7
use Psr\Log\LoggerInterface;
8
9
class WorkerManager
10
{
11
    protected $workers;
12
    protected $jobManager;
13
14
    /** @var LoggerInterface */
15
    protected $logger;
16
    protected $eventDispatcher;
17
    protected $logFunc;
18
19 9
    public function __construct(JobManagerInterface $jobManager, EventDispatcher $eventDispatcher)
20
    {
21 9
        $this->workers = array();
22 9
        $this->jobManager = $jobManager;
23 9
        $this->eventDispatcher = $eventDispatcher;
24 9
    }
25
26
    public function setLogger(LoggerInterface $logger)
27
    {
28
        $this->logger = $logger;
29
    }
30
31 9
    public function addWorker(Worker $worker)
32
    {
33 9
        if ($this->logger) {
34
            $this->logger->debug(__METHOD__." - Added worker: {$worker->getName()}");
35
        }
36
37 9
        if (isset($this->workers[$worker->getName()])) {
38 1
            throw new \Exception("{$worker->getName()} already exists in worker manager");
39
        }
40
41 9
        $this->workers[$worker->getName()] = $worker;
42 9
    }
43
44 8
    public function getWorker($name)
45
    {
46 8
        if (isset($this->workers[$name])) {
47 8
            return $this->workers[$name];
48
        }
49
50 1
        return null;
51
    }
52
53
    public function getWorkers()
54
    {
55
        return $this->workers;
56
    }
57
58 3
    public function setLoggingFunc(callable $callable)
59
    {
60 3
        $this->logFunc = $callable;
61 3
    }
62
63 6
    public function log($level, $msg, array $context = [])
64
    {
65 6
        if ($this->logFunc) {
66 3
            call_user_func_array($this->logFunc, [$level, $msg, $context]);
67
68 3
            return;
69
        }
70
71 4
        if ($this->logger) {
72
            $this->logger->$level($msg, $context);
73
74
            return;
75
        }
76 4
    }
77
78
    /**
79
     * @param null $workerName
80
     * @param null $methodName
81
     * @param bool $prioritize
82
     *
83
     * @return null|Job
84
     */
85 5
    public function run($workerName = null, $methodName = null, $prioritize = true, $runId = null)
86
    {
87 5
        $job = $this->jobManager->getJob($workerName, $methodName, $prioritize, $runId);
88 5
        if (!$job) {
89 3
            return null; // no job to run
90
        }
91
92 5
        return $this->runJob($job);
93
    }
94
95 6
    public function runJob(Job $job)
96
    {
97 6
        $event = new Event($job);
98 6
        $this->eventDispatcher->dispatch(Event::PRE_JOB, $event);
99
100 6
        $start = microtime(true);
101 6
        $handleException = function ($exception) use ($job) {
102
            /** @var \Exception $exception */
103 1
            $exceptionMessage = get_class($exception)."\n".$exception->getCode().' - '.$exception->getMessage()."\n".$exception->getTraceAsString();
104 1
            $this->log('debug', "Failed: {$job->getClassName()}->{$job->getMethod()}");
105 1
            $job->setStatus(BaseJob::STATUS_ERROR);
106 1
            if ($job instanceof RetryableJob) {
107
                $job->setErrorCount($job->getErrorCount() + 1);
108
                if (null !== ($maxError = $job->getMaxError()) && $job->getErrorCount() >= $maxError) {
109
                    $job->setStatus(RetryableJob::STATUS_MAX_ERROR);
110
                }
111
            }
112 1
            $job->setMessage($exceptionMessage);
113 6
        };
114
        try {
115 6
            $worker = $this->getWorker($job->getWorkerName());
116 6
            $this->log('debug', "Start: {$job->getClassName()}->{$job->getMethod()}", $job->getArgs());
117 6
            $job->setStartedAt(new \DateTime());
118 6
            call_user_func_array(array($worker, $job->getMethod()), $job->getArgs());
119
120
            // Job finshed successfuly... do we remove the job from database?
121 5
            $job->setStatus(BaseJob::STATUS_SUCCESS);
122 5
            $job->setMessage(null);
123 6
        } catch (\Throwable $exception) {
124
            $handleException($exception);
125 1
        } catch (\Exception $exception) {
126 1
            $handleException($exception);
127
        }
128
129
        // save Job history
130 6
        $elapsed = microtime(true) - $start;
131 6
        $job->setFinishedAt(new \DateTime());
132 6
        $job->setElapsed($elapsed);
133
134 6
        $this->log('debug', "Finished: {$job->getClassName()}->{$job->getMethod()} in {$elapsed} seconds");
135 6
        $this->log('debug', "Save job history: {$job->getId()}");
136
137 6
        $this->jobManager->saveHistory($job);
138 6
        $this->eventDispatcher->dispatch(Event::POST_JOB, $event);
139
140 6
        return $job;
141
    }
142
}
143