Completed
Push — master ( c08683...8607fa )
by Adam
16:41
created

Worker   A

Complexity

Total Complexity 13

Size/Duplication

Total Lines 150
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 7

Test Coverage

Coverage 100%

Importance

Changes 5
Bugs 1 Features 4
Metric Value
wmc 13
c 5
b 1
f 4
lcom 1
cbo 7
dl 0
loc 150
ccs 43
cts 43
cp 1
rs 10

8 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 13 2
A consume() 0 4 2
A tick() 0 19 4
A invoke() 0 13 1
A jobStart() 0 5 1
A jobFinish() 0 5 1
A jobShutdown() 0 4 1
A jobException() 0 5 1
1
<?php
2
3
namespace Equip\Queue;
4
5
use Equip\Queue\Driver\DriverInterface;
6
use Equip\Queue\Handler\HandlerFactoryInterface;
7
use Equip\Queue\Serializer\JsonSerializer;
8
use Equip\Queue\Serializer\MessageSerializerInterface;
9
use Exception;
10
use Psr\Log\LoggerInterface;
11
12
class Worker
13
{
14
    /**
15
     * @var DriverInterface
16
     */
17
    private $driver;
18
19
    /**
20
     * @var Event
21
     */
22
    private $event;
23
24
    /**
25
     * @var LoggerInterface
26
     */
27
    private $logger;
28
29
    /**
30
     * @var MessageSerializerInterface
31
     */
32
    private $serializer;
33
34
    /**
35
     * @var HandlerFactoryInterface
36
     */
37
    private $handlers;
38
39
    /**
40
     * @param DriverInterface $driver
41
     * @param Event $event
42
     * @param LoggerInterface $logger
43
     * @param MessageSerializerInterface $serializer
44
     * @param HandlerFactoryInterface $handlers
45
     */
46 6
    public function __construct(
47
        DriverInterface $driver,
48
        Event $event,
49
        LoggerInterface $logger,
50
        MessageSerializerInterface $serializer = null,
51
        HandlerFactoryInterface $handlers
52
    ) {
53 6
        $this->driver = $driver;
54 6
        $this->event = $event;
55 6
        $this->logger = $logger;
56 6
        $this->serializer = $serializer ?: new JsonSerializer;
57 6
        $this->handlers = $handlers;
58 6
    }
59
60
    /**
61
     * Consumes messages off of the queue
62
     *
63
     * @param string $queue
64
     */
65 1
    public function consume($queue)
66
    {
67 1
        while ($this->tick($queue)) { /* NOOP */ }
68 1
    }
69
70
    /**
71
     * Handles fetching messages from the queue
72
     *
73
     * @param string $queue
74
     *
75
     * @return bool
76
     */
77 6
    protected function tick($queue)
78
    {
79 6
        $packet = $this->driver->dequeue($queue);
80 6
        if (empty($packet)) {
81 1
            return true;
82
        }
83
84 5
        $message = $this->serializer->deserialize($packet);
85
        try {
86 5
            if ($this->invoke($message) === false) {
87 2
                $this->jobShutdown($message);
88 2
                return false;
89
            }
90 3
        } catch (Exception $exception) {
91 2
            $this->jobException($message, $exception);
92
        }
93
94 3
        return true;
95
    }
96
97
    /**
98
     * Invoke the messages handler
99
     *
100
     * @param Message $message
101
     *
102
     * @return null|bool
103
     */
104 5
    private function invoke(Message $message)
105
    {
106 5
        $this->jobStart($message);
107
108 5
        $result = call_user_func(
109 5
            $this->handlers->get($message->handler()),
110
            $message
111 4
        );
112
113 3
        $this->jobFinish($message);
114
115 3
        return $result;
116
    }
117
118
    /**
119
     * Handles actions related to a job starting
120
     *
121
     * @param Message $message
122
     */
123 5
    private function jobStart(Message $message)
124
    {
125 5
        $this->event->acknowledge($message);
126 5
        $this->logger->info(sprintf('`%s` job started', $message->handler()));
127 5
    }
128
129
    /**
130
     * Handles actions related to a job finishing
131
     *
132
     * @param Message $message
133
     */
134 3
    private function jobFinish(Message $message)
135
    {
136 3
        $this->event->finish($message);
137 3
        $this->logger->info(sprintf('`%s` job finished', $message->handler()));
138 3
    }
139
140
    /**
141
     * Handles actions related to a job shutting down the consumer
142
     *
143
     * @param Message $message
144
     */
145 2
    private function jobShutdown(Message $message)
146
    {
147 2
        $this->logger->notice(sprintf('shutting down by request of `%s`', $message->handler()));
148 2
    }
149
150
    /**
151
     * Handles actions related to job exceptions
152
     *
153
     * @param Message $message
154
     * @param Exception $exception
155
     */
156 2
    private function jobException(Message $message, Exception $exception)
157
    {
158 2
        $this->logger->error($exception->getMessage());
159 2
        $this->event->reject($message, $exception);
160 2
    }
161
}
162