Completed
Push — master ( 2f7a99...5fa5de )
by Matze
10:42
created

Worker   A

Complexity

Total Complexity 7

Size/Duplication

Total Lines 83
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 1

Test Coverage

Coverage 100%

Importance

Changes 1
Bugs 0 Features 0
Metric Value
wmc 7
c 1
b 0
f 0
lcom 1
cbo 1
dl 0
loc 83
rs 10
ccs 33
cts 33
cp 1

4 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 4 1
A executeJob() 0 9 2
B execute() 0 28 2
A handleCronEvent() 0 12 2
1
<?php
2
3
namespace BrainExe\Core\MessageQueue;
4
5
use BrainExe\Annotations\Annotations\Inject;
6
use BrainExe\Annotations\Annotations\Service;
7
use BrainExe\Core\EventDispatcher\AbstractEvent;
8
use BrainExe\Core\EventDispatcher\CronEvent;
9
use BrainExe\Core\Stats\Event;
10
use BrainExe\Core\Traits\EventDispatcherTrait;
11
use BrainExe\Core\Traits\LoggerTrait;
12
use Cron\CronExpression;
13
use Exception;
14
15
/**
16
 * @Service("MessageQueue.Worker", public=false)
17
 */
18
class Worker
19
{
20
21
    use LoggerTrait;
22
    use EventDispatcherTrait;
23
24
    /**
25
     * @var Gateway
26
     */
27
    private $gateway;
28
29
    /**
30
     * @Inject({"@MessageQueue.Gateway"})
31
     * @param Gateway $gateway
32
     */
33 5
    public function __construct(Gateway $gateway)
34
    {
35 5
        $this->gateway = $gateway;
36 5
    }
37
38
    /**
39
     * @param Job $job
40
     */
41 3
    public function executeJob(Job $job)
42
    {
43
        try {
44 3
            $this->execute($job);
45 1
        } catch (Exception $e) {
46 1
            $this->error($e->getMessage(), ['exception' => $e]);
47 1
            $this->gateway->restoreJob($job);
48
        }
49 3
    }
50
51
    /**
52
     * @param Job $job
53
     */
54 3
    private function execute(Job $job)
55
    {
56 3
        $event = $job->event;
57 3
        if ($event instanceof CronEvent) {
58 1
            $event = $this->handleCronEvent($job, $event);
59
        }
60
61 3
        $logStart = microtime(true);
62
63 3
        $this->dispatchEvent($event);
64
65 2
        $neededTime = microtime(true) - $logStart;
66
67 2
        $this->info(
68
            sprintf(
69 2
                '[MQ]: %s. Time: %0.2fms',
70 2
                $event->eventName,
71 2
                $neededTime * 1000
72
            ),
73 2
            ['channel' => 'message_queue']
74
        );
75
76 2
        $event = new Event(
77 2
            Event::INCREASE,
78 2
            sprintf('message_queue:handled:%s', $event->eventName)
79
        );
80 2
        $this->dispatchEvent($event);
81 2
    }
82
83
    /**
84
     * @param Job $job
85
     * @param CronEvent $event
86
     * @return AbstractEvent
87
     */
88 1
    private function handleCronEvent(Job $job, CronEvent $event)
89
    {
90 1
        if (!$event->isPropagationStopped()) {
91 1
            $cron = CronExpression::factory($event->expression);
92 1
            $nextRun = $cron->getNextRunDate()->getTimestamp();
93
94 1
            $job->timestamp = $nextRun;
95 1
            $this->gateway->addJob($job);
96
        }
97
98 1
        return $event->event;
99
    }
100
}
101