1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace Bernard; |
4
|
|
|
|
5
|
|
|
use Symfony\Component\EventDispatcher\EventDispatcherInterface; |
6
|
|
|
use Bernard\Event\EnvelopeEvent; |
7
|
|
|
use Bernard\Event\PingEvent; |
8
|
|
|
use Bernard\Event\RejectEnvelopeEvent; |
9
|
|
|
|
10
|
|
|
class Consumer |
11
|
|
|
{ |
12
|
|
|
protected $router; |
13
|
|
|
protected $dispatcher; |
14
|
|
|
protected $shutdown = false; |
15
|
|
|
protected $pause = false; |
16
|
|
|
protected $configured = false; |
17
|
|
|
protected $options = [ |
18
|
|
|
'max-runtime' => PHP_INT_MAX, |
19
|
|
|
'max-messages' => null, |
20
|
|
|
'stop-when-empty' => false, |
21
|
|
|
'stop-on-error' => false, |
22
|
|
|
]; |
23
|
|
|
|
24
|
|
|
/** |
25
|
|
|
* @param Router $router |
26
|
|
|
* @param EventDispatcherInterface $dispatcher |
27
|
|
|
*/ |
28
|
11 |
|
public function __construct(Router $router, EventDispatcherInterface $dispatcher) |
29
|
|
|
{ |
30
|
11 |
|
$this->router = $router; |
31
|
11 |
|
$this->dispatcher = $dispatcher; |
32
|
11 |
|
} |
33
|
|
|
|
34
|
|
|
/** |
35
|
|
|
* Starts an infinite loop calling Consumer::tick();. |
36
|
|
|
* |
37
|
|
|
* @param Queue $queue |
38
|
|
|
* @param array $options |
39
|
|
|
*/ |
40
|
|
|
public function consume(Queue $queue, array $options = []) |
41
|
|
|
{ |
42
|
|
|
declare(ticks=1); |
43
|
|
|
|
44
|
|
|
$this->bind(); |
45
|
|
|
|
46
|
|
|
while ($this->tick($queue, $options)) { |
|
|
|
|
47
|
|
|
// NO op |
48
|
|
|
} |
49
|
|
|
} |
50
|
|
|
|
51
|
|
|
/** |
52
|
|
|
* Returns true do indicate it should be run again or false to indicate |
53
|
|
|
* it should not be run again. |
54
|
|
|
* |
55
|
|
|
* @param Queue $queue |
56
|
|
|
* @param array $options |
57
|
|
|
* |
58
|
|
|
* @return bool |
59
|
|
|
*/ |
60
|
10 |
|
public function tick(Queue $queue, array $options = []) |
61
|
|
|
{ |
62
|
10 |
|
$this->configure($options); |
63
|
|
|
|
64
|
10 |
|
if ($this->shutdown) { |
65
|
1 |
|
return false; |
66
|
|
|
} |
67
|
|
|
|
68
|
9 |
|
if (microtime(true) > $this->options['max-runtime']) { |
69
|
1 |
|
return false; |
70
|
|
|
} |
71
|
|
|
|
72
|
8 |
|
if ($this->pause) { |
73
|
1 |
|
return true; |
74
|
|
|
} |
75
|
|
|
|
76
|
8 |
|
$this->dispatcher->dispatch(BernardEvents::PING, new PingEvent($queue)); |
77
|
|
|
|
78
|
8 |
|
if (!$envelope = $queue->dequeue()) { |
79
|
2 |
|
return !$this->options['stop-when-empty']; |
80
|
|
|
} |
81
|
|
|
|
82
|
7 |
|
$this->invoke($envelope, $queue); |
83
|
|
|
|
84
|
6 |
|
if (null === $this->options['max-messages']) { |
85
|
5 |
|
return true; |
86
|
|
|
} |
87
|
|
|
|
88
|
1 |
|
return (bool) --$this->options['max-messages']; |
89
|
|
|
} |
90
|
|
|
|
91
|
|
|
/** |
92
|
|
|
* Mark Consumer as shutdown. |
93
|
|
|
*/ |
94
|
1 |
|
public function shutdown() |
95
|
|
|
{ |
96
|
1 |
|
$this->shutdown = true; |
97
|
1 |
|
} |
98
|
|
|
|
99
|
|
|
/** |
100
|
|
|
* Pause consuming. |
101
|
|
|
*/ |
102
|
1 |
|
public function pause() |
103
|
|
|
{ |
104
|
1 |
|
$this->pause = true; |
105
|
1 |
|
} |
106
|
|
|
|
107
|
|
|
/** |
108
|
|
|
* Resume consuming. |
109
|
|
|
*/ |
110
|
1 |
|
public function resume() |
111
|
|
|
{ |
112
|
1 |
|
$this->pause = false; |
113
|
1 |
|
} |
114
|
|
|
|
115
|
|
|
/** |
116
|
|
|
* Until there is a real extension point to doing invoked stuff, this can be used |
117
|
|
|
* by wrapping the invoke method. |
118
|
|
|
* |
119
|
|
|
* @param Envelope $envelope |
120
|
|
|
* @param Queue $queue |
121
|
|
|
* |
122
|
|
|
* @throws \Exception |
123
|
|
|
* @throws \Throwable |
124
|
|
|
*/ |
125
|
8 |
|
public function invoke(Envelope $envelope, Queue $queue) |
126
|
|
|
{ |
127
|
|
|
try { |
128
|
8 |
|
$this->dispatcher->dispatch(BernardEvents::INVOKE, new EnvelopeEvent($envelope, $queue)); |
129
|
|
|
|
130
|
8 |
|
$receiver = $this->router->route($envelope); |
131
|
7 |
|
$receiver->receive($envelope->getMessage()); |
132
|
|
|
|
133
|
|
|
// We successfully processed the message. |
134
|
6 |
|
$queue->acknowledge($envelope); |
135
|
|
|
|
136
|
6 |
|
$this->dispatcher->dispatch(BernardEvents::ACKNOWLEDGE, new EnvelopeEvent($envelope, $queue)); |
137
|
8 |
|
} catch (\Throwable $error) { |
|
|
|
|
138
|
|
|
$this->rejectDispatch($error, $envelope, $queue); |
139
|
2 |
|
} catch (\Exception $exception) { |
140
|
2 |
|
$this->rejectDispatch($exception, $envelope, $queue); |
141
|
|
|
} |
142
|
7 |
|
} |
143
|
|
|
|
144
|
|
|
/** |
145
|
|
|
* @param array $options |
146
|
|
|
*/ |
147
|
10 |
|
protected function configure(array $options) |
148
|
|
|
{ |
149
|
10 |
|
if ($this->configured) { |
150
|
3 |
|
return $this->options; |
151
|
|
|
} |
152
|
|
|
|
153
|
10 |
|
$this->options = array_filter($options) + $this->options; |
154
|
10 |
|
$this->options['max-runtime'] += microtime(true); |
155
|
10 |
|
$this->configured = true; |
156
|
10 |
|
} |
157
|
|
|
|
158
|
|
|
/** |
159
|
|
|
* Setup signal handlers for unix signals. |
160
|
|
|
* |
161
|
|
|
* If the process control extension does not exist (e.g. on Windows), ignore the signal handlers. |
162
|
|
|
* The difference is that when terminating the consumer, running processes will not stop gracefully |
163
|
|
|
* and will terminate immediately. |
164
|
|
|
*/ |
165
|
|
|
protected function bind() |
166
|
|
|
{ |
167
|
|
|
if (function_exists('pcntl_signal')) { |
168
|
|
|
pcntl_signal(SIGTERM, [$this, 'shutdown']); |
169
|
|
|
pcntl_signal(SIGINT, [$this, 'shutdown']); |
170
|
|
|
pcntl_signal(SIGQUIT, [$this, 'shutdown']); |
171
|
|
|
pcntl_signal(SIGUSR2, [$this, 'pause']); |
172
|
|
|
pcntl_signal(SIGCONT, [$this, 'resume']); |
173
|
|
|
} |
174
|
|
|
} |
175
|
|
|
|
176
|
|
|
/** |
177
|
|
|
* @param \Throwable|\Exception $exception note that the type-hint is missing due to PHP 5.x compat |
178
|
|
|
* @param Envelope $envelope |
179
|
|
|
* @param Queue $queue |
180
|
|
|
* |
181
|
|
|
* @throws \Exception |
182
|
|
|
* @throws \Throwable |
183
|
|
|
*/ |
184
|
2 |
|
private function rejectDispatch($exception, Envelope $envelope, Queue $queue) |
185
|
|
|
{ |
186
|
|
|
// Make sure the exception is not interfering. |
187
|
|
|
// Previously failing jobs handling have been moved to a middleware. |
188
|
|
|
// |
189
|
|
|
// Emit an event to let others log that exception |
190
|
2 |
|
$this->dispatcher->dispatch(BernardEvents::REJECT, new RejectEnvelopeEvent($envelope, $queue, $exception)); |
191
|
|
|
|
192
|
2 |
|
if ($this->options['stop-on-error']) { |
193
|
1 |
|
throw $exception; |
194
|
|
|
} |
195
|
1 |
|
} |
196
|
|
|
} |
197
|
|
|
|
This check looks for
while
loops that have no statements or where all statements have been commented out. This may be the result of changes for debugging or the code may simply be obsolete.Consider removing the loop.