Completed
Push — master ( c08683...8607fa )
by Adam
16:41
created

Worker::invoke()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 13
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 1

Importance

Changes 1
Bugs 0 Features 1
Metric Value
c 1
b 0
f 1
dl 0
loc 13
ccs 7
cts 7
cp 1
rs 9.4285
cc 1
eloc 7
nc 1
nop 1
crap 1
1
<?php
2
3
namespace Equip\Queue;
4
5
use Equip\Queue\Driver\DriverInterface;
6
use Equip\Queue\Handler\HandlerFactoryInterface;
7
use Equip\Queue\Serializer\JsonSerializer;
8
use Equip\Queue\Serializer\MessageSerializerInterface;
9
use Exception;
10
use Psr\Log\LoggerInterface;
11
12
class Worker
13
{
14
    /**
15
     * @var DriverInterface
16
     */
17
    private $driver;
18
19
    /**
20
     * @var Event
21
     */
22
    private $event;
23
24
    /**
25
     * @var LoggerInterface
26
     */
27
    private $logger;
28
29
    /**
30
     * @var MessageSerializerInterface
31
     */
32
    private $serializer;
33
34
    /**
35
     * @var HandlerFactoryInterface
36
     */
37
    private $handlers;
38
39
    /**
40
     * @param DriverInterface $driver
41
     * @param Event $event
42
     * @param LoggerInterface $logger
43
     * @param MessageSerializerInterface $serializer
44
     * @param HandlerFactoryInterface $handlers
45
     */
46 6
    public function __construct(
47
        DriverInterface $driver,
48
        Event $event,
49
        LoggerInterface $logger,
50
        MessageSerializerInterface $serializer = null,
51
        HandlerFactoryInterface $handlers
52
    ) {
53 6
        $this->driver = $driver;
54 6
        $this->event = $event;
55 6
        $this->logger = $logger;
56 6
        $this->serializer = $serializer ?: new JsonSerializer;
57 6
        $this->handlers = $handlers;
58 6
    }
59
60
    /**
61
     * Consumes messages off of the queue
62
     *
63
     * @param string $queue
64
     */
65 1
    public function consume($queue)
66
    {
67 1
        while ($this->tick($queue)) { /* NOOP */ }
68 1
    }
69
70
    /**
71
     * Handles fetching messages from the queue
72
     *
73
     * @param string $queue
74
     *
75
     * @return bool
76
     */
77 6
    protected function tick($queue)
78
    {
79 6
        $packet = $this->driver->dequeue($queue);
80 6
        if (empty($packet)) {
81 1
            return true;
82
        }
83
84 5
        $message = $this->serializer->deserialize($packet);
85
        try {
86 5
            if ($this->invoke($message) === false) {
87 2
                $this->jobShutdown($message);
88 2
                return false;
89
            }
90 3
        } catch (Exception $exception) {
91 2
            $this->jobException($message, $exception);
92
        }
93
94 3
        return true;
95
    }
96
97
    /**
98
     * Invoke the messages handler
99
     *
100
     * @param Message $message
101
     *
102
     * @return null|bool
103
     */
104 5
    private function invoke(Message $message)
105
    {
106 5
        $this->jobStart($message);
107
108 5
        $result = call_user_func(
109 5
            $this->handlers->get($message->handler()),
110
            $message
111 4
        );
112
113 3
        $this->jobFinish($message);
114
115 3
        return $result;
116
    }
117
118
    /**
119
     * Handles actions related to a job starting
120
     *
121
     * @param Message $message
122
     */
123 5
    private function jobStart(Message $message)
124
    {
125 5
        $this->event->acknowledge($message);
126 5
        $this->logger->info(sprintf('`%s` job started', $message->handler()));
127 5
    }
128
129
    /**
130
     * Handles actions related to a job finishing
131
     *
132
     * @param Message $message
133
     */
134 3
    private function jobFinish(Message $message)
135
    {
136 3
        $this->event->finish($message);
137 3
        $this->logger->info(sprintf('`%s` job finished', $message->handler()));
138 3
    }
139
140
    /**
141
     * Handles actions related to a job shutting down the consumer
142
     *
143
     * @param Message $message
144
     */
145 2
    private function jobShutdown(Message $message)
146
    {
147 2
        $this->logger->notice(sprintf('shutting down by request of `%s`', $message->handler()));
148 2
    }
149
150
    /**
151
     * Handles actions related to job exceptions
152
     *
153
     * @param Message $message
154
     * @param Exception $exception
155
     */
156 2
    private function jobException(Message $message, Exception $exception)
157
    {
158 2
        $this->logger->error($exception->getMessage());
159 2
        $this->event->reject($message, $exception);
160 2
    }
161
}
162