Completed
Push — master ( cdd718...8a683b )
by Matthew
06:43 queued 30s
created

WorkerManager::addWorker()   A

Complexity

Conditions 3
Paths 4

Size

Total Lines 12
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 3.2098

Importance

Changes 0
Metric Value
dl 0
loc 12
ccs 5
cts 7
cp 0.7143
rs 9.4285
c 0
b 0
f 0
cc 3
eloc 6
nc 4
nop 1
crap 3.2098
1
<?php
2
3
namespace Dtc\QueueBundle\Model;
4
5
use Dtc\QueueBundle\EventDispatcher\Event;
6
use Dtc\QueueBundle\EventDispatcher\EventDispatcher;
7
use Dtc\QueueBundle\Exception\DuplicateWorkerException;
8
use Psr\Log\LoggerInterface;
9
10
class WorkerManager
11
{
12
    protected $workers;
13
    protected $jobManager;
14
15
    /** @var LoggerInterface */
16
    protected $logger;
17
    protected $eventDispatcher;
18
    protected $logFunc;
19
20 2
    public function __construct(JobManagerInterface $jobManager, EventDispatcher $eventDispatcher)
21
    {
22 2
        $this->workers = array();
23 2
        $this->jobManager = $jobManager;
24 2
        $this->eventDispatcher = $eventDispatcher;
25 2
    }
26
27
    public function setLogger(LoggerInterface $logger)
28
    {
29
        $this->logger = $logger;
30
    }
31
32 2
    public function addWorker(Worker $worker)
33
    {
34 2
        if ($this->logger) {
35
            $this->logger->debug(__METHOD__." - Added worker: {$worker->getName()}");
36
        }
37
38 2
        if (isset($this->workers[$worker->getName()])) {
39
            throw new DuplicateWorkerException("{$worker->getName()} already exists in worker manager");
40
        }
41
42 2
        $this->workers[$worker->getName()] = $worker;
43 2
    }
44
45 1
    public function getWorker($name)
46
    {
47 1
        if (isset($this->workers[$name])) {
48 1
            return $this->workers[$name];
49
        }
50
51 1
        return null;
52
    }
53
54
    public function getWorkers()
55
    {
56
        return $this->workers;
57
    }
58
59
    public function setLoggingFunc(callable $callable)
60
    {
61
        $this->logFunc = $callable;
62
    }
63
64
    public function log($level, $msg, array $context = [])
65
    {
66
        if ($this->logFunc) {
67
            call_user_func_array($this->logFunc, [$level, $msg, $context]);
68
69
            return;
70
        }
71
72
        if ($this->logger) {
73
            $this->logger->$level($msg, $context);
74
75
            return;
76
        }
77
    }
78
79
    /**
80
     * @param null $workerName
81
     * @param null $methodName
82
     * @param bool $prioritize
83
     *
84
     * @return null|Job
85
     */
86
    public function run($workerName = null, $methodName = null, $prioritize = true, $runId = null)
87
    {
88
        $job = $this->jobManager->getJob($workerName, $methodName, $prioritize, $runId);
89
        if (!$job) {
90
            return null; // no job to run
91
        }
92
93
        return $this->runJob($job);
94
    }
95
96
    public function runJob(Job $job)
97
    {
98
        $event = new Event($job);
99
        $this->eventDispatcher->dispatch(Event::PRE_JOB, $event);
100
101
        $start = microtime(true);
102
        $handleException = function (\Exception $exception) use ($job) {
103
            /** @var \Exception $exception */
104
            $exceptionMessage = get_class($exception)."\n".$exception->getCode().' - '.$exception->getMessage()."\n".$exception->getTraceAsString();
105
            $this->log('debug', "Failed: {$job->getClassName()}->{$job->getMethod()}");
106
            $job->setStatus(BaseJob::STATUS_ERROR);
107
            if ($job instanceof RetryableJob) {
108
                $job->setErrorCount($job->getErrorCount() + 1);
109
                if (null !== ($maxError = $job->getMaxError()) && $job->getErrorCount() >= $maxError) {
110
                    $job->setStatus(RetryableJob::STATUS_MAX_ERROR);
111
                }
112
            }
113
            $job->setMessage($exceptionMessage);
114
        };
115
        try {
116
            $worker = $this->getWorker($job->getWorkerName());
117
            $this->log('debug', "Start: {$job->getClassName()}->{$job->getMethod()}", $job->getArgs());
118
            $job->setStartedAt(new \DateTime());
119
            call_user_func_array(array($worker, $job->getMethod()), $job->getArgs());
120
121
            // Job finshed successfuly... do we remove the job from database?
122
            $job->setStatus(BaseJob::STATUS_SUCCESS);
123
            $job->setMessage(null);
124
        } catch (\Throwable $exception) {
125
            $handleException($exception);
126
        } catch (\Exception $exception) {
127
            $handleException($exception);
128
        }
129
130
        // save Job history
131
        $elapsed = microtime(true) - $start;
132
        $job->setFinishedAt(new \DateTime());
133
        $job->setElapsed($elapsed);
134
135
        $this->log('debug', "Finished: {$job->getClassName()}->{$job->getMethod()} in {$elapsed} seconds");
136
        $this->log('debug', "Save job history: {$job->getId()}");
137
138
        $this->jobManager->saveHistory($job);
139
        $this->eventDispatcher->dispatch(Event::POST_JOB, $event);
140
141
        return $job;
142
    }
143
}
144