Passed
Push — master ( e3298a...32f3de )
by Matthew
08:05
created

Manager/WorkerManager.php (1 issue)

Labels
Severity
1
<?php
2
3
namespace Dtc\QueueBundle\Manager;
4
5
use Dtc\QueueBundle\EventDispatcher\Event;
6
use Dtc\QueueBundle\EventDispatcher\EventDispatcher;
7
use Dtc\QueueBundle\Exception\DuplicateWorkerException;
8
use Dtc\QueueBundle\Model\BaseJob;
9
use Dtc\QueueBundle\Model\Job;
10
use Dtc\QueueBundle\Model\JobTiming;
11
use Dtc\QueueBundle\Model\Worker;
12
use Dtc\QueueBundle\Util\Util;
13
use Psr\Log\LoggerInterface;
14
15
class WorkerManager
16
{
17
    protected $workers;
18
    protected $jobManager;
19
20
    /** @var LoggerInterface */
21
    protected $logger;
22
    protected $eventDispatcher;
23
    protected $logFunc;
24
25 18
    public function __construct(JobManagerInterface $jobManager, EventDispatcher $eventDispatcher)
26
    {
27 18
        $this->workers = array();
28 18
        $this->jobManager = $jobManager;
29 18
        $this->eventDispatcher = $eventDispatcher;
30 18
    }
31
32
    public function setLogger(LoggerInterface $logger)
33
    {
34
        $this->logger = $logger;
35
    }
36
37
    /**
38
     * @param Worker $worker
39
     *
40
     * @throws DuplicateWorkerException
41
     */
42 9
    public function addWorker(Worker $worker)
43
    {
44 9
        if ($this->logger) {
45
            $this->logger->debug(__METHOD__." - Added worker: {$worker->getName()}");
46
        }
47
48 9
        if (isset($this->workers[$worker->getName()])) {
49 1
            throw new DuplicateWorkerException("{$worker->getName()} already exists in worker manager");
50
        }
51
52 9
        $this->workers[$worker->getName()] = $worker;
53 9
    }
54
55 8
    public function getWorker($name)
56
    {
57 8
        if (isset($this->workers[$name])) {
58 8
            return $this->workers[$name];
59
        }
60
61
        return null;
62
    }
63
64 1
    public function getWorkers()
65
    {
66 1
        return $this->workers;
67
    }
68
69 3
    public function setLoggingFunc(callable $callable)
70
    {
71 3
        $this->logFunc = $callable;
72 3
    }
73
74 6
    public function log($level, $msg, array $context = [])
75
    {
76 6
        if ($this->logFunc) {
77 3
            call_user_func_array($this->logFunc, [$level, $msg, $context]);
78
79 3
            return;
80
        }
81
82 4
        if ($this->logger) {
83
            $this->logger->$level($msg, $context);
84
85
            return;
86
        }
87 4
    }
88
89
    /**
90
     * @param null $workerName
91
     * @param null $methodName
92
     * @param bool $prioritize
93
     *
94
     * @return Job|null
95
     */
96 5
    public function run($workerName = null, $methodName = null, $prioritize = true, $runId = null)
97
    {
98 5
        $job = $this->jobManager->getJob($workerName, $methodName, $prioritize, $runId);
99 5
        if (!$job) {
100 3
            return null; // no job to run
101
        }
102
103 5
        return $this->runJob($job);
104
    }
105
106
    /**
107
     * @param array $payload
108
     * @param Job   $job
109
     */
110 1
    protected function handleException(array $payload, Job $job)
111
    {
112 1
        $exception = $payload[0];
113 1
        $exceptionMessage = get_class($exception)."\n".$exception->getCode().' - '.$exception->getMessage()."\n".$exception->getTraceAsString();
114 1
        $this->log('debug', "Failed: {$job->getClassName()}->{$job->getMethod()}");
115 1
        $job->setStatus(BaseJob::STATUS_EXCEPTION);
116 1
        $message = $job->getMessage();
117 1
        if (null !== $message) {
118
            $message .= "\n\n";
119
        } else {
120 1
            $message = $exceptionMessage;
121
        }
122
123 1
        $job->setMessage($message);
124 1
        $this->jobManager->getJobTimingManager()->recordTiming(JobTiming::STATUS_FINISHED_EXCEPTION);
125 1
    }
126
127 5
    public function processStatus(Job $job, $result)
128
    {
129 5
        if (Worker::RESULT_FAILURE === $result) {
130 3
            $job->setStatus(BaseJob::STATUS_FAILURE);
131 3
            $this->jobManager->getJobTimingManager()->recordTiming(JobTiming::STATUS_FINISHED_FAILURE);
132
133 3
            return;
134
        }
135 2
        $job->setStatus(BaseJob::STATUS_SUCCESS);
136 2
        $this->jobManager->getJobTimingManager()->recordTiming(JobTiming::STATUS_FINISHED_SUCCESS);
137 2
    }
138
139 6
    public function runJob(Job $job)
140
    {
141 6
        $event = new Event($job);
142 6
        $this->eventDispatcher->dispatch(Event::PRE_JOB, $event);
143
144 6
        $start = microtime(true);
145
        try {
146
            /** @var Worker $worker */
147 6
            $worker = $this->getWorker($job->getWorkerName());
148 6
            $this->log('debug', "Start: {$job->getClassName()}->{$job->getMethod()}", $job->getArgs());
149 6
            $job->setStartedAt(Util::getMicrotimeFloatDateTime($start));
150 6
            $job->setMessage(null);
151 6
            $worker->setCurrentJob($job);
152 6
            $result = call_user_func_array(array($worker, $job->getMethod()), $job->getArgs());
153 5
            $this->processStatus($job, $result);
154 1
        } catch (\Throwable $exception) {
155 1
            $this->handleException([$exception], $job);
156
        } catch (\Exception $exception) {
157
            $this->handleException([$exception], $job);
158
        }
159
160
        // save Job history
161 6
        $elapsed = microtime(true) - $start;
162 6
        $job->setFinishedAt(Util::getMicrotimeDateTime());
163 6
        $job->setElapsed($elapsed);
0 ignored issues
show
$elapsed of type double is incompatible with the type integer expected by parameter $elapsed of Dtc\QueueBundle\Model\Job::setElapsed(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

163
        $job->setElapsed(/** @scrutinizer ignore-type */ $elapsed);
Loading history...
164
165 6
        $this->log('debug', "Finished: {$job->getClassName()}->{$job->getMethod()} in {$elapsed} seconds");
166 6
        $this->log('debug', "Save job history: {$job->getId()}");
167
168 6
        $this->jobManager->saveHistory($job);
169 6
        $this->eventDispatcher->dispatch(Event::POST_JOB, $event);
170
171 6
        return $job;
172
    }
173
}
174