Worker   A
last analyzed

Complexity

Total Complexity 7

Size/Duplication

Total Lines 93
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 1

Test Coverage

Coverage 100%

Importance

Changes 0
Metric Value
wmc 7
lcom 1
cbo 1
dl 0
loc 93
ccs 37
cts 37
cp 1
rs 10
c 0
b 0
f 0

4 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 5 1
A executeJob() 0 9 2
A handleCronEvent() 0 12 2
B execute() 0 31 2
1
<?php
2
3
namespace BrainExe\Core\MessageQueue;
4
5
use BrainExe\Core\Annotations\Service;
6
use BrainExe\Core\Cron\Expression;
7
use BrainExe\Core\EventDispatcher\AbstractEvent;
8
use BrainExe\Core\EventDispatcher\CronEvent;
9
use BrainExe\Core\EventDispatcher\JobEvent;
10
use BrainExe\Core\Traits\EventDispatcherTrait;
11
use BrainExe\Core\Traits\LoggerTrait;
12
use BrainExe\Core\Traits\TimeTrait;
13
14
use Throwable;
15
16
/**
17
 * @Service
18
 */
19
class Worker
20
{
21
22
    use LoggerTrait;
23
    use TimeTrait;
24
    use EventDispatcherTrait;
25
26
    /**
27
     * @var Gateway
28
     */
29
    private $gateway;
30
31
    /**
32
     * @var Expression
33
     */
34
    private $cron;
35
36
    /**
37
     * @param Gateway $gateway
38
     * @param Expression $cron
39
     */
40 4
    public function __construct(Gateway $gateway, Expression $cron)
41
    {
42 4
        $this->gateway = $gateway;
43 4
        $this->cron    = $cron;
44 4
    }
45
46
    /**
47
     * @param Job $job
48
     */
49 3
    public function executeJob(Job $job)
50
    {
51
        try {
52 3
            $this->execute($job);
53 1
        } catch (Throwable $e) {
54 1
            $this->error($e->getMessage(), ['exception' => $e]);
55 1
            $this->gateway->restoreJob($job);
56
        }
57 3
    }
58
59
    /**
60
     * @param Job $job
61
     */
62 3
    private function execute(Job $job)
63
    {
64 3
        $event = $job->getEvent();
65 3
        if ($event instanceof CronEvent) {
66 1
            $event = $this->handleCronEvent($job, $event);
67
        }
68
69 3
        $logStart = microtime(true);
70
71 3
        $this->dispatchEvent($event);
72
73 2
        $neededTime = microtime(true) - $logStart;
74
75 2
        $this->info(
76 2
            sprintf(
77 2
                '[MQ]: %s. Time: %0.2fms',
78 2
                $event->getEventName(),
79 2
                $neededTime * 1000
80
            ),
81
            [
82 2
                'channel'   => 'message_queue',
83 2
                'time'      => round($neededTime * 1000, 2),
84 2
                'eventName' => $event->getEventName(),
85 2
                'jobId'     => $job->getJobId(),
86 2
                'event'     => json_encode($event)
87
            ]
88
        );
89
90 2
        $handledEvent = new JobEvent(JobEvent::HANDLED, $job);
91 2
        $this->dispatcher->dispatchEvent($handledEvent);
92 2
    }
93
94
    /**
95
     * @param Job $job
96
     * @param CronEvent $event
97
     * @return AbstractEvent
98
     */
99 1
    private function handleCronEvent(Job $job, CronEvent $event) : AbstractEvent
100
    {
101 1
        if (!$event->isPropagationStopped()) {
102 1
            $nextRun = $this->cron->getNextRun($event->getExpression());
103
104 1
            $job->setStartTime($this->now());
105 1
            $job->setTimestamp($nextRun);
106 1
            $this->gateway->addJob($job);
107
        }
108
109 1
        return $event->getEvent();
110
    }
111
}
112