Completed
Push — master ( 58e792...f01f49 )
by Matthew
05:34
created

WorkerManager::getWorker()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 8
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 2.0625

Importance

Changes 0
Metric Value
dl 0
loc 8
ccs 3
cts 4
cp 0.75
rs 9.4285
c 0
b 0
f 0
cc 2
eloc 4
nc 2
nop 1
crap 2.0625
1
<?php
2
3
namespace Dtc\QueueBundle\Model;
4
5
use Dtc\QueueBundle\EventDispatcher\Event;
6
use Dtc\QueueBundle\EventDispatcher\EventDispatcher;
7
use Monolog\Logger;
8
9
class WorkerManager
10
{
11
    protected $workers;
12
    protected $jobManager;
13
    protected $logger;
14
    protected $eventDispatcher;
15
    protected $logFunc;
16
17 4
    public function __construct(JobManagerInterface $jobManager, EventDispatcher $eventDispatcher, Logger $logger = null)
18
    {
19 4
        $this->workers = array();
20 4
        $this->jobManager = $jobManager;
21 4
        $this->logger = $logger;
22 4
        $this->eventDispatcher = $eventDispatcher;
23 4
    }
24
25 4
    public function addWorker(Worker $worker)
26
    {
27 4
        if ($this->logger) {
28
            $this->logger->debug("Added worker: {$worker->getName()}");
29
        }
30
31 4
        if (isset($this->workers[$worker->getName()])) {
32 1
            throw new \Exception("{$worker->getName()} already exists in worker manager");
33
        }
34
35 4
        $this->workers[$worker->getName()] = $worker;
36 4
    }
37
38 4
    public function getWorker($name)
39
    {
40 4
        if (isset($this->workers[$name])) {
41 4
            return $this->workers[$name];
42
        }
43
44
        return null;
45
    }
46
47
    public function getWorkers()
48
    {
49
        return $this->workers;
50
    }
51
52
    public function setLoggingFunc(callable $callable)
53
    {
54
        $this->logFunc = $callable;
55
    }
56
57 3
    public function log($level, $msg, array $context = [])
58
    {
59 3
        if ($this->logFunc) {
60
            call_user_func_array($this->logFunc, [$level, $msg, $context]);
61
62
            return;
63
        }
64
65 3
        if ($this->logger) {
66
            $this->logger->$level($msg, $context);
67
68
            return;
69
        }
70 3
    }
71
72
    /**
73
     * @param null $workerName
74
     * @param null $methodName
75
     * @param bool $prioritize
76
     *
77
     * @return null|Job
78
     */
79 2
    public function run($workerName = null, $methodName = null, $prioritize = true, $runId = null)
80
    {
81 2
        $job = $this->jobManager->getJob($workerName, $methodName, $prioritize, $runId);
82 2
        if (!$job) {
83
            return null; // no job to run
84
        }
85
86 2
        return $this->runJob($job);
87
    }
88
89 3
    public function runJob(Job $job)
90
    {
91 3
        $event = new Event($job);
92 3
        $this->eventDispatcher->dispatch(Event::PRE_JOB, $event);
93
94 3
        $start = microtime(true);
95 3
        $handleException = function($exception) use ($job) {
96
            /** @var \Exception $exception */
97 1
            $exceptionMessage = get_class($exception)."\n".$exception->getCode().' - '.$exception->getMessage()."\n".$exception->getTraceAsString();
98 1
            $this->log('debug', "Failed: {$job->getClassName()}->{$job->getMethod()}");
99 1
            $job->setStatus(BaseJob::STATUS_ERROR);
100 1
            if ($job instanceof RetryableJob) {
101
                $job->setErrorCount($job->getErrorCount() + 1);
102
                if (null !== ($maxError = $job->getMaxError()) && $job->getErrorCount() >= $maxError) {
103
                    $job->setStatus(RetryableJob::STATUS_MAX_ERROR);
104
                }
105
            }
106 1
            $job->setMessage($exceptionMessage);
107 3
        };
108
        try {
109
            $worker = $this->getWorker($job->getWorkerName());
110
            $this->log('debug', "Start: {$job->getClassName()}->{$job->getMethod()}", $job->getArgs());
111
            $job->setStartedAt(new \DateTime());
112
            call_user_func_array(array($worker, $job->getMethod()), $job->getArgs());
113
114
            // Job finshed successfuly... do we remove the job from database?
115
            $job->setStatus(BaseJob::STATUS_SUCCESS);
116
            $job->setMessage(null);
117
        } catch (\Throwable $exception) {
118
            $handleException($exception);
119
        } catch (\Exception $exception) {
120
            $handleException($exception);
121
        }
122
123
        // save Job history
124
        $elapsed = microtime(true) - $start;
125
        $job->setFinishedAt(new \DateTime());
126
        $job->setElapsed($elapsed);
127
128
        $this->log('debug', "Finished: {$job->getClassName()}->{$job->getMethod()} in {$elapsed} micro-seconds");
129
        $this->log('debug', "Save job history: {$job->getId()}");
130
131
        $this->jobManager->saveHistory($job);
132
        $this->eventDispatcher->dispatch(Event::POST_JOB, $event);
133
134
        return $job;
135
    }
136
}
137