Completed
Pull Request — master (#8)
by Adam
02:32
created

Worker::invoke()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 13
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 1

Importance

Changes 1
Bugs 0 Features 1
Metric Value
c 1
b 0
f 1
dl 0
loc 13
ccs 7
cts 7
cp 1
rs 9.4285
cc 1
eloc 7
nc 1
nop 1
crap 1
1
<?php
2
3
namespace Equip\Queue;
4
5
use Equip\Queue\Driver\DriverInterface;
6
use Equip\Queue\Serializer\JsonSerializer;
7
use Equip\Queue\Serializer\MessageSerializerInterface;
8
use Exception;
9
use Psr\Log\LoggerInterface;
10
11
class Worker
12
{
13
    /**
14
     * @var DriverInterface
15
     */
16
    private $driver;
17
18
    /**
19
     * @var Event
20
     */
21
    private $event;
22
23
    /**
24
     * @var LoggerInterface
25
     */
26
    private $logger;
27
28
    /**
29
     * @var MessageSerializerInterface
30
     */
31
    private $serializer;
32
33
    /**
34
     * @var RouterInterface
35
     */
36
    private $router;
37
38
    /**
39
     * @param DriverInterface $driver
40
     * @param Event $event
41
     * @param LoggerInterface $logger
42
     * @param MessageSerializerInterface $serializer
43
     * @param RouterInterface $router
44
     */
45 6
    public function __construct(
46
        DriverInterface $driver,
47
        Event $event,
48
        LoggerInterface $logger,
49
        MessageSerializerInterface $serializer = null,
50
        RouterInterface $router
51
    ) {
52 6
        $this->driver = $driver;
53 6
        $this->event = $event;
54 6
        $this->logger = $logger;
55 6
        $this->serializer = $serializer ?: new JsonSerializer;
56 6
        $this->router = $router;
57 6
    }
58
59
    /**
60
     * Consumes messages off of the queue
61
     *
62
     * @param string $queue
63
     */
64 1
    public function consume($queue)
65
    {
66 1
        while ($this->tick($queue)) { /* NOOP */ }
67 1
    }
68
69
    /**
70
     * Handles fetching messages from the queue
71
     *
72
     * @param string $queue
73
     *
74
     * @return bool
75
     */
76 6
    protected function tick($queue)
77
    {
78 6
        $packet = $this->driver->dequeue($queue);
79 6
        if (empty($packet)) {
80 1
            return true;
81
        }
82
83 5
        $message = $this->serializer->deserialize($packet);
84
        try {
85 5
            if ($this->invoke($message) === false) {
86 2
                $this->jobShutdown($message);
87 2
                return false;
88
            }
89 3
        } catch (Exception $exception) {
90 2
            $this->jobException($message, $exception);
91
        }
92
93 3
        return true;
94
    }
95
96
    /**
97
     * Invoke the messages handler
98
     *
99
     * @param Message $message
100
     *
101
     * @return null|bool
102
     */
103 5
    private function invoke(Message $message)
104
    {
105 5
        $this->jobStart($message);
106
107 5
        $result = call_user_func(
108 5
            $this->router->get($message),
0 ignored issues
show
Documentation introduced by
$message is of type object<Equip\Queue\Message>, but the function expects a string.

It seems like the type of the argument is not accepted by the function/method which you are calling.

In some cases, in particular if PHP’s automatic type-juggling kicks in this might be fine. In other cases, however this might be a bug.

We suggest to add an explicit type cast like in the following example:

function acceptsInteger($int) { }

$x = '123'; // string "123"

// Instead of
acceptsInteger($x);

// we recommend to use
acceptsInteger((integer) $x);
Loading history...
109
            $message
110 4
        );
111
112 3
        $this->jobFinish($message);
113
114 3
        return $result;
115
    }
116
117
    /**
118
     * Handles actions related to a job starting
119
     *
120
     * @param Message $message
121
     */
122 5
    private function jobStart(Message $message)
123
    {
124 5
        $this->event->acknowledge($message);
125 5
        $this->logger->info(sprintf('`%s` job started', $message->handler()));
126 5
    }
127
128
    /**
129
     * Handles actions related to a job finishing
130
     *
131
     * @param Message $message
132
     */
133 3
    private function jobFinish(Message $message)
134
    {
135 3
        $this->event->finish($message);
136 3
        $this->logger->info(sprintf('`%s` job finished', $message->handler()));
137 3
    }
138
139
    /**
140
     * Handles actions related to a job shutting down the consumer
141
     *
142
     * @param Message $message
143
     */
144 2
    private function jobShutdown(Message $message)
145
    {
146 2
        $this->logger->notice(sprintf('shutting down by request of `%s`', $message->handler()));
147 2
    }
148
149
    /**
150
     * Handles actions related to job exceptions
151
     *
152
     * @param Message $message
153
     * @param Exception $exception
154
     */
155 2
    private function jobException(Message $message, Exception $exception)
156
    {
157 2
        $this->logger->error($exception->getMessage());
158 2
        $this->event->reject($message, $exception);
159 2
    }
160
}
161