1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace Equip\Queue; |
4
|
|
|
|
5
|
|
|
use Equip\Queue\Driver\DriverInterface; |
6
|
|
|
use Equip\Queue\Exception\HandlerException; |
7
|
|
|
use Equip\Queue\Serializer\JsonSerializer; |
8
|
|
|
use Equip\Queue\Serializer\MessageSerializerInterface; |
9
|
|
|
use Exception; |
10
|
|
|
|
11
|
|
|
class Worker |
12
|
|
|
{ |
13
|
|
|
/** |
14
|
|
|
* @var DriverInterface |
15
|
|
|
*/ |
16
|
|
|
private $driver; |
17
|
|
|
|
18
|
|
|
/** |
19
|
|
|
* @var Event |
20
|
|
|
*/ |
21
|
|
|
private $event; |
22
|
|
|
|
23
|
|
|
/** |
24
|
|
|
* @var MessageSerializerInterface |
25
|
|
|
*/ |
26
|
|
|
private $serializer; |
27
|
|
|
|
28
|
|
|
/** |
29
|
|
|
* @var array |
30
|
|
|
*/ |
31
|
|
|
private $handlers; |
32
|
|
|
|
33
|
|
|
/** |
34
|
|
|
* @param DriverInterface $driver |
35
|
|
|
* @param Event $event |
36
|
|
|
* @param MessageSerializerInterface $serializer |
37
|
|
|
* @param array $handlers |
38
|
|
|
*/ |
39
|
|
|
public function __construct( |
40
|
|
|
DriverInterface $driver, |
41
|
|
|
Event $event, |
42
|
|
|
MessageSerializerInterface $serializer = null, |
43
|
|
|
array $handlers = [] |
44
|
|
|
) { |
45
|
|
|
$this->driver = $driver; |
46
|
|
|
$this->event = $event; |
47
|
|
|
$this->serializer = $serializer ?: new JsonSerializer; |
48
|
|
|
$this->handlers = $handlers; |
49
|
|
|
} |
50
|
|
|
|
51
|
|
|
/** |
52
|
|
|
* Consumes messages off of the queue |
53
|
|
|
* |
54
|
|
|
* @param string $queue |
55
|
|
|
*/ |
56
|
|
|
public function consume($queue) |
57
|
|
|
{ |
58
|
|
|
while ($this->tick($queue)) { /* NOOP */ } |
|
|
|
|
59
|
|
|
} |
60
|
|
|
|
61
|
|
|
/** |
62
|
|
|
* Handles fetching messages from the queue |
63
|
|
|
* |
64
|
|
|
* @param string $queue |
65
|
|
|
* |
66
|
|
|
* @return bool |
67
|
|
|
*/ |
68
|
|
|
private function tick($queue) |
69
|
|
|
{ |
70
|
|
|
$packet = $this->driver->pop($queue); |
71
|
|
|
if (empty($packet)) { |
72
|
|
|
return true; |
73
|
|
|
} |
74
|
|
|
|
75
|
|
|
$message = $this->serializer->deserialize($packet); |
76
|
|
|
|
77
|
|
|
$handler = $this->getHandler($message->handler(), $this->handlers); |
78
|
|
|
if (!$handler) { |
79
|
|
|
return true; |
80
|
|
|
} |
81
|
|
|
|
82
|
|
|
try { |
83
|
|
|
$result = call_user_func($handler, $message); |
84
|
|
|
|
85
|
|
|
$this->event->acknowledge($message); |
86
|
|
|
|
87
|
|
|
if ($result === false) { |
88
|
|
|
return false; |
89
|
|
|
} |
90
|
|
|
} catch (Exception $exception) { |
91
|
|
|
$this->event->reject($message, $exception); |
92
|
|
|
} |
93
|
|
|
|
94
|
|
|
return true; |
95
|
|
|
} |
96
|
|
|
|
97
|
|
|
/** |
98
|
|
|
* @param string $handler |
99
|
|
|
* @param array $router |
100
|
|
|
* |
101
|
|
|
* @return null|callable |
102
|
|
|
* @throws HandlerException If handler is not callable |
103
|
|
|
*/ |
104
|
|
|
private function getHandler($handler, array $router = []) |
105
|
|
|
{ |
106
|
|
|
if (!isset($router[$handler])) { |
107
|
|
|
return null; |
108
|
|
|
} |
109
|
|
|
|
110
|
|
|
$callable = $router[$handler]; |
111
|
|
|
if (!is_callable($callable)) { |
112
|
|
|
throw HandlerException::invalidHandler($handler); |
113
|
|
|
} |
114
|
|
|
|
115
|
|
|
return $callable; |
116
|
|
|
} |
117
|
|
|
} |
118
|
|
|
|
This check looks for
while
loops that have no statements or where all statements have been commented out. This may be the result of changes for debugging or the code may simply be obsolete.Consider removing the loop.