1 | <?php |
||
10 | class Consumer |
||
11 | { |
||
12 | protected $router; |
||
13 | protected $dispatcher; |
||
14 | protected $shutdown = false; |
||
15 | protected $pause = false; |
||
16 | protected $configured = false; |
||
17 | protected $options = [ |
||
18 | 'max-runtime' => PHP_INT_MAX, |
||
19 | 'max-messages' => null, |
||
20 | 'stop-when-empty' => false, |
||
21 | 'stop-on-error' => false, |
||
22 | ]; |
||
23 | |||
24 | /** |
||
25 | * @param Router $router |
||
26 | * @param EventDispatcherInterface $dispatcher |
||
27 | */ |
||
28 | 11 | public function __construct(Router $router, EventDispatcherInterface $dispatcher) |
|
33 | |||
34 | /** |
||
35 | * Starts an infinite loop calling Consumer::tick();. |
||
36 | * |
||
37 | * @param Queue $queue |
||
38 | * @param array $options |
||
39 | */ |
||
40 | public function consume(Queue $queue, array $options = []) |
||
41 | { |
||
42 | declare(ticks=1); |
||
43 | |||
44 | $this->bind(); |
||
45 | |||
46 | while ($this->tick($queue, $options)) { |
||
|
|||
47 | // NO op |
||
48 | } |
||
49 | } |
||
50 | |||
51 | /** |
||
52 | * Returns true do indicate it should be run again or false to indicate |
||
53 | * it should not be run again. |
||
54 | * |
||
55 | * @param Queue $queue |
||
56 | * @param array $options |
||
57 | * |
||
58 | * @return bool |
||
59 | */ |
||
60 | 10 | public function tick(Queue $queue, array $options = []) |
|
90 | |||
91 | /** |
||
92 | * Mark Consumer as shutdown. |
||
93 | */ |
||
94 | 1 | public function shutdown() |
|
98 | |||
99 | /** |
||
100 | * Pause consuming. |
||
101 | */ |
||
102 | 1 | public function pause() |
|
106 | |||
107 | /** |
||
108 | * Resume consuming. |
||
109 | */ |
||
110 | 1 | public function resume() |
|
114 | |||
115 | /** |
||
116 | * Until there is a real extension point to doing invoked stuff, this can be used |
||
117 | * by wrapping the invoke method. |
||
118 | * |
||
119 | * @param Envelope $envelope |
||
120 | * @param Queue $queue |
||
121 | * |
||
122 | * @throws \Exception |
||
123 | * @throws \Throwable |
||
124 | */ |
||
125 | 8 | public function invoke(Envelope $envelope, Queue $queue) |
|
126 | { |
||
127 | try { |
||
128 | 8 | $this->dispatcher->dispatch(BernardEvents::INVOKE, new EnvelopeEvent($envelope, $queue)); |
|
129 | |||
130 | 8 | $receiver = $this->router->route($envelope); |
|
131 | 7 | $receiver->receive($envelope->getMessage()); |
|
132 | |||
133 | // We successfully processed the message. |
||
134 | 6 | $queue->acknowledge($envelope); |
|
135 | |||
136 | 6 | $this->dispatcher->dispatch(BernardEvents::ACKNOWLEDGE, new EnvelopeEvent($envelope, $queue)); |
|
137 | 8 | } catch (\Throwable $error) { |
|
138 | $this->rejectDispatch($error, $envelope, $queue); |
||
139 | 2 | } catch (\Exception $exception) { |
|
140 | 2 | $this->rejectDispatch($exception, $envelope, $queue); |
|
141 | } |
||
142 | 7 | } |
|
143 | |||
144 | /** |
||
145 | * @param array $options |
||
146 | */ |
||
147 | 10 | protected function configure(array $options) |
|
157 | |||
158 | /** |
||
159 | * Setup signal handlers for unix signals. |
||
160 | * |
||
161 | * If the process control extension does not exist (e.g. on Windows), ignore the signal handlers. |
||
162 | * The difference is that when terminating the consumer, running processes will not stop gracefully |
||
163 | * and will terminate immediately. |
||
164 | */ |
||
165 | protected function bind() |
||
175 | |||
176 | /** |
||
177 | * @param \Throwable|\Exception $exception note that the type-hint is missing due to PHP 5.x compat |
||
178 | * @param Envelope $envelope |
||
179 | * @param Queue $queue |
||
180 | * |
||
181 | * @throws \Exception |
||
182 | * @throws \Throwable |
||
183 | */ |
||
184 | 2 | private function rejectDispatch($exception, Envelope $envelope, Queue $queue) |
|
196 | } |
||
197 |
This check looks for
while
loops that have no statements or where all statements have been commented out. This may be the result of changes for debugging or the code may simply be obsolete.Consider removing the loop.