Completed
Push — master ( eedd5b...a9e65f )
by Adam
02:22
created

Worker::jobFinish()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 5
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 1
Metric Value
c 1
b 0
f 1
dl 0
loc 5
rs 9.4285
cc 1
eloc 3
nc 1
nop 1
1
<?php
2
3
namespace Equip\Queue;
4
5
use Equip\Queue\Driver\DriverInterface;
6
use Equip\Queue\Exception\HandlerException;
7
use Equip\Queue\Serializer\JsonSerializer;
8
use Equip\Queue\Serializer\MessageSerializerInterface;
9
use Exception;
10
use Psr\Log\LoggerInterface;
11
12
class Worker
13
{
14
    /**
15
     * @var DriverInterface
16
     */
17
    private $driver;
18
19
    /**
20
     * @var Event
21
     */
22
    private $event;
23
24
    /**
25
     * @var LoggerInterface
26
     */
27
    private $logger;
28
29
    /**
30
     * @var MessageSerializerInterface
31
     */
32
    private $serializer;
33
34
    /**
35
     * @var array
36
     */
37
    private $handlers;
38
39
    /**
40
     * @param DriverInterface $driver
41
     * @param Event $event
42
     * @param LoggerInterface $logger
43
     * @param MessageSerializerInterface $serializer
44
     * @param array $handlers
45
     */
46
    public function __construct(
47
        DriverInterface $driver,
48
        Event $event,
49
        LoggerInterface $logger,
50
        MessageSerializerInterface $serializer = null,
51
        array $handlers = []
52
    ) {
53
        $this->driver = $driver;
54
        $this->event = $event;
55
        $this->logger = $logger;
56
        $this->serializer = $serializer ?: new JsonSerializer;
57
        $this->handlers = $handlers;
58
    }
59
60
    /**
61
     * Consumes messages off of the queue
62
     *
63
     * @param string $queue
64
     */
65
    public function consume($queue)
66
    {
67
        while ($this->tick($queue)) { /* NOOP */ }
1 ignored issue
show
Unused Code introduced by
This while loop is empty and can be removed.

This check looks for while loops that have no statements or where all statements have been commented out. This may be the result of changes for debugging or the code may simply be obsolete.

Consider removing the loop.

Loading history...
68
    }
69
70
    /**
71
     * Handles fetching messages from the queue
72
     *
73
     * @param string $queue
74
     *
75
     * @return bool
76
     */
77
    private function tick($queue)
78
    {
79
        $packet = $this->driver->dequeue($queue);
80
        if (empty($packet)) {
81
             return true;
82
        }
83
84
        $message = $this->serializer->deserialize($packet);
85
86
        $handler = $this->getHandler($message->handler(), $this->handlers);
87
        if (!$handler) {
88
            $this->logger->warning(sprintf('Missing `%s` handler', $message->handler()));
89
            return true;
90
        }
91
92
        try {
93
            $this->jobStart($message);
94
            
95
            $result = call_user_func($handler, $message);
96
            
97
            $this->jobFinish($message);
98
99
            if ($result === false) {
100
                $this->jobShutdown($message);
101
                return false;
102
            }
103
        } catch (Exception $exception) {
104
            $this->jobException($message, $exception);
105
        }
106
107
        return true;
108
    }
109
110
    /**
111
     * @param string $handler
112
     * @param array $router
113
     *
114
     * @return null|callable
115
     * @throws HandlerException If handler is not callable
116
     */
117
    private function getHandler($handler, array $router = [])
118
    {
119
        if (!isset($router[$handler])) {
120
            return null;
121
        }
122
123
        $callable = $router[$handler];
124
        if (!is_callable($callable)) {
125
            throw HandlerException::invalidHandler($handler);
126
        }
127
128
        return $callable;
129
    }
130
131
    /**
132
     * Handles actions related to a job starting
133
     * 
134
     * @param Message $message
135
     */
136
    private function jobStart(Message $message)
137
    {
138
        $this->event->acknowledge($message);
139
        $this->logger->info(sprintf('`%s` job started', $message->handler()));
140
    }
141
142
    /**
143
     * Handles actions related to a job finishing
144
     * 
145
     * @param Message $message
146
     */
147
    private function jobFinish(Message $message)
148
    {
149
        $this->event->finish($message);
150
        $this->logger->info(sprintf('`%s` job finished', $message->handler()));
151
    }
152
153
    /**
154
     * Handles actions related to a job shutting down the consumer
155
     * 
156
     * @param Message $message
157
     */
158
    private function jobShutdown(Message $message)
159
    {
160
        $this->logger->notice(sprintf('shutting down by request of `%s`', $message->handler()));
161
    }
162
163
    /**
164
     * Handles actions related to job exceptions
165
     * 
166
     * @param Message $message
167
     * @param Exception $exception
168
     */
169
    private function jobException(Message $message, Exception $exception)
170
    {
171
        $this->logger->error($exception->getMessage());
172
        $this->event->reject($message, $exception);
173
    }
174
}
175