Completed
Push — master ( 16d212...b7b31c )
by Matthew
07:34 queued 05:11
created

WorkerManager::addWorker()   A

Complexity

Conditions 3
Paths 4

Size

Total Lines 12
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 3.1406

Importance

Changes 0
Metric Value
dl 0
loc 12
ccs 6
cts 8
cp 0.75
rs 9.4285
c 0
b 0
f 0
cc 3
eloc 6
nc 4
nop 1
crap 3.1406
1
<?php
2
3
namespace Dtc\QueueBundle\Model;
4
5
use Dtc\QueueBundle\EventDispatcher\Event;
6
use Dtc\QueueBundle\EventDispatcher\EventDispatcher;
7
use Dtc\QueueBundle\Exception\DuplicateWorkerException;
8
use Psr\Log\LoggerInterface;
9
10
class WorkerManager
11
{
12
    protected $workers;
13
    protected $jobManager;
14
15
    /** @var LoggerInterface */
16
    protected $logger;
17
    protected $eventDispatcher;
18
    protected $logFunc;
19
20 9
    public function __construct(JobManagerInterface $jobManager, EventDispatcher $eventDispatcher)
21
    {
22 9
        $this->workers = array();
23 9
        $this->jobManager = $jobManager;
24 9
        $this->eventDispatcher = $eventDispatcher;
25 9
    }
26
27
    public function setLogger(LoggerInterface $logger)
28
    {
29
        $this->logger = $logger;
30
    }
31
32 9
    public function addWorker(Worker $worker)
33
    {
34 9
        if ($this->logger) {
35
            $this->logger->debug(__METHOD__." - Added worker: {$worker->getName()}");
36
        }
37
38 9
        if (isset($this->workers[$worker->getName()])) {
39 1
            throw new DuplicateWorkerException("{$worker->getName()} already exists in worker manager");
40
        }
41
42 9
        $this->workers[$worker->getName()] = $worker;
43 9
    }
44
45 8
    public function getWorker($name)
46
    {
47 8
        if (isset($this->workers[$name])) {
48 8
            return $this->workers[$name];
49
        }
50
51 1
        return null;
52
    }
53
54
    public function getWorkers()
55
    {
56
        return $this->workers;
57
    }
58
59 3
    public function setLoggingFunc(callable $callable)
60
    {
61 3
        $this->logFunc = $callable;
62 3
    }
63
64 6
    public function log($level, $msg, array $context = [])
65
    {
66 6
        if ($this->logFunc) {
67 3
            call_user_func_array($this->logFunc, [$level, $msg, $context]);
68
69 3
            return;
70
        }
71
72 4
        if ($this->logger) {
73
            $this->logger->$level($msg, $context);
74
75
            return;
76
        }
77 4
    }
78
79
    /**
80
     * @param null $workerName
81
     * @param null $methodName
82
     * @param bool $prioritize
83
     *
84
     * @return null|Job
85
     */
86 5
    public function run($workerName = null, $methodName = null, $prioritize = true, $runId = null)
87
    {
88 5
        $job = $this->jobManager->getJob($workerName, $methodName, $prioritize, $runId);
89 5
        if (!$job) {
90 3
            return null; // no job to run
91
        }
92
93 5
        return $this->runJob($job);
94
    }
95
96
    /**
97
     * @param array $payload
98
     * @param Job   $job
99
     */
100 1
    protected function handleException(array $payload, Job $job)
101
    {
102 1
        $exception = $payload[0];
103 1
        $exceptionMessage = get_class($exception)."\n".$exception->getCode().' - '.$exception->getMessage()."\n".$exception->getTraceAsString();
104 1
        $this->log('debug', "Failed: {$job->getClassName()}->{$job->getMethod()}");
105 1
        $job->setStatus(BaseJob::STATUS_ERROR);
106 1
        if ($job instanceof RetryableJob) {
107
            $job->setErrorCount($job->getErrorCount() + 1);
108
            if (null !== ($maxError = $job->getMaxError()) && $job->getErrorCount() >= $maxError) {
109
                $job->setStatus(RetryableJob::STATUS_MAX_ERROR);
110
            }
111
        }
112 1
        $job->setMessage($exceptionMessage);
113 1
    }
114
115 6
    public function runJob(Job $job)
116
    {
117 6
        $event = new Event($job);
118 6
        $this->eventDispatcher->dispatch(Event::PRE_JOB, $event);
119
120 6
        $start = microtime(true);
121
        try {
122 6
            $worker = $this->getWorker($job->getWorkerName());
123 6
            $this->log('debug', "Start: {$job->getClassName()}->{$job->getMethod()}", $job->getArgs());
124 6
            $job->setStartedAt(new \DateTime());
125 6
            call_user_func_array(array($worker, $job->getMethod()), $job->getArgs());
126
127
            // Job finshed successfuly... do we remove the job from database?
128 5
            $job->setStatus(BaseJob::STATUS_SUCCESS);
129 5
            $job->setMessage(null);
130 6
        } catch (\Throwable $exception) {
131
            $this->handleException([$exception], $job);
132 1
        } catch (\Exception $exception) {
133 1
            $this->handleException([$exception], $job);
134
        }
135
136
        // save Job history
137 6
        $elapsed = microtime(true) - $start;
138 6
        $job->setFinishedAt(new \DateTime());
139 6
        $job->setElapsed($elapsed);
140
141 6
        $this->log('debug', "Finished: {$job->getClassName()}->{$job->getMethod()} in {$elapsed} seconds");
142 6
        $this->log('debug', "Save job history: {$job->getId()}");
143
144 6
        $this->jobManager->saveHistory($job);
145 6
        $this->eventDispatcher->dispatch(Event::POST_JOB, $event);
146
147 6
        return $job;
148
    }
149
}
150