Completed
Push — master ( 2db41f...aa6ae6 )
by Matthew
15:06 queued 36s
created

WorkerManager::setLoggingFunc()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 4
ccs 3
cts 3
cp 1
rs 10
c 0
b 0
f 0
cc 1
eloc 2
nc 1
nop 1
crap 1
1
<?php
2
3
namespace Dtc\QueueBundle\Model;
4
5
use Dtc\QueueBundle\EventDispatcher\Event;
6
use Dtc\QueueBundle\EventDispatcher\EventDispatcher;
7
use Dtc\QueueBundle\Exception\DuplicateWorkerException;
8
use Psr\Log\LoggerInterface;
9
10
class WorkerManager
11
{
12
    protected $workers;
13
    protected $jobManager;
14
15
    /** @var LoggerInterface */
16
    protected $logger;
17
    protected $eventDispatcher;
18
    protected $logFunc;
19
20 9
    public function __construct(JobManagerInterface $jobManager, EventDispatcher $eventDispatcher)
21
    {
22 9
        $this->workers = array();
23 9
        $this->jobManager = $jobManager;
24 9
        $this->eventDispatcher = $eventDispatcher;
25 9
    }
26
27
    public function setLogger(LoggerInterface $logger)
28
    {
29
        $this->logger = $logger;
30
    }
31
32
    /**
33
     * @param Worker $worker
34
     *
35
     * @throws DuplicateWorkerException
36
     */
37 9
    public function addWorker(Worker $worker)
38
    {
39 9
        if ($this->logger) {
40
            $this->logger->debug(__METHOD__." - Added worker: {$worker->getName()}");
41
        }
42
43 9
        if (isset($this->workers[$worker->getName()])) {
44 1
            throw new DuplicateWorkerException("{$worker->getName()} already exists in worker manager");
45
        }
46
47 9
        $this->workers[$worker->getName()] = $worker;
48 9
    }
49
50 8
    public function getWorker($name)
51
    {
52 8
        if (isset($this->workers[$name])) {
53 8
            return $this->workers[$name];
54
        }
55
56 1
        return null;
57
    }
58
59
    public function getWorkers()
60
    {
61
        return $this->workers;
62
    }
63
64 3
    public function setLoggingFunc(callable $callable)
65
    {
66 3
        $this->logFunc = $callable;
67 3
    }
68
69 6
    public function log($level, $msg, array $context = [])
70
    {
71 6
        if ($this->logFunc) {
72 3
            call_user_func_array($this->logFunc, [$level, $msg, $context]);
73
74 3
            return;
75
        }
76
77 4
        if ($this->logger) {
78
            $this->logger->$level($msg, $context);
79
80
            return;
81
        }
82 4
    }
83
84
    /**
85
     * @param null $workerName
86
     * @param null $methodName
87
     * @param bool $prioritize
88
     *
89
     * @return null|Job
90
     */
91 5
    public function run($workerName = null, $methodName = null, $prioritize = true, $runId = null)
92
    {
93 5
        $job = $this->jobManager->getJob($workerName, $methodName, $prioritize, $runId);
94 5
        if (!$job) {
95 3
            return null; // no job to run
96
        }
97
98 5
        return $this->runJob($job);
99
    }
100
101
    /**
102
     * @param array $payload
103
     * @param Job   $job
104
     */
105 1
    protected function handleException(array $payload, Job $job)
106
    {
107 1
        $exception = $payload[0];
108 1
        $exceptionMessage = get_class($exception)."\n".$exception->getCode().' - '.$exception->getMessage()."\n".$exception->getTraceAsString();
109 1
        $this->log('debug', "Failed: {$job->getClassName()}->{$job->getMethod()}");
110 1
        $job->setStatus(BaseJob::STATUS_ERROR);
111 1
        if ($job instanceof RetryableJob) {
112
            $job->setErrorCount($job->getErrorCount() + 1);
113
            if (null !== ($maxError = $job->getMaxError()) && $job->getErrorCount() >= $maxError) {
114
                $job->setStatus(RetryableJob::STATUS_MAX_ERROR);
115
            }
116
        }
117 1
        $job->setMessage($exceptionMessage);
118 1
        $this->jobManager->getJobTimingManager()->recordTiming(JobTiming::STATUS_FINISHED_ERROR);
119 1
    }
120
121 6
    public function runJob(Job $job)
122
    {
123 6
        $event = new Event($job);
124 6
        $this->eventDispatcher->dispatch(Event::PRE_JOB, $event);
125
126 6
        $start = microtime(true);
127
        try {
128 6
            $worker = $this->getWorker($job->getWorkerName());
129 6
            $this->log('debug', "Start: {$job->getClassName()}->{$job->getMethod()}", $job->getArgs());
130 6
            $job->setStartedAt(new \DateTime());
131 6
            call_user_func_array(array($worker, $job->getMethod()), $job->getArgs());
132
133
            // Job finshed successfuly... do we remove the job from database?
134 5
            $job->setStatus(BaseJob::STATUS_SUCCESS);
135
136 5
            $job->setMessage(null);
137 5
            $this->jobManager->getJobTimingManager()->recordTiming(JobTiming::STATUS_FINISHED_SUCCESS);
138 1
        } catch (\Throwable $exception) {
139 1
            $this->handleException([$exception], $job);
140
        } catch (\Exception $exception) {
141
            $this->handleException([$exception], $job);
142
        }
143
144
        // save Job history
145 6
        $elapsed = microtime(true) - $start;
146 6
        $job->setFinishedAt(new \DateTime());
147 6
        $job->setElapsed($elapsed);
148
149 6
        $this->log('debug', "Finished: {$job->getClassName()}->{$job->getMethod()} in {$elapsed} seconds");
150 6
        $this->log('debug', "Save job history: {$job->getId()}");
151
152 6
        $this->jobManager->saveHistory($job);
153 6
        $this->eventDispatcher->dispatch(Event::POST_JOB, $event);
154
155 6
        return $job;
156
    }
157
}
158