1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
/** |
4
|
|
|
* This file is part of HeriJobQueueBundle. |
5
|
|
|
* |
6
|
|
|
* (c) Alexandre Mogère |
7
|
|
|
* |
8
|
|
|
* This source file is subject to the MIT license that is bundled |
9
|
|
|
* with this source code in the file LICENSE. |
10
|
|
|
*/ |
11
|
|
|
|
12
|
|
|
namespace Heri\Bundle\JobQueueBundle\Service; |
13
|
|
|
|
14
|
|
|
use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand; |
15
|
|
|
use Symfony\Component\Console\Input\ArrayInput; |
16
|
|
|
use Symfony\Component\Console\Output\OutputInterface; |
17
|
|
|
use Symfony\Component\Console\Output\ConsoleOutput; |
18
|
|
|
use Symfony\Component\Console\Output\StreamOutput; |
19
|
|
|
use Symfony\Component\Process\Process; |
20
|
|
|
use Psr\Log\LoggerInterface; |
21
|
|
|
use Heri\Bundle\JobQueueBundle\Exception\InvalidUnserializedMessageException; |
22
|
|
|
|
23
|
|
|
class QueueService |
24
|
|
|
{ |
25
|
|
|
const MICROSECONDS_PER_SECOND = 1000000; |
26
|
|
|
|
27
|
|
|
const PRIORITY_HIGH = 1; |
28
|
|
|
|
29
|
|
|
/** |
30
|
|
|
* var ZendQueue\Adapter\AbstractAdapter. |
31
|
|
|
*/ |
32
|
|
|
public $adapter; |
33
|
|
|
|
34
|
|
|
/** |
35
|
|
|
* var LoggerInterface. |
36
|
|
|
*/ |
37
|
|
|
protected $logger; |
38
|
|
|
|
39
|
|
|
/** |
40
|
|
|
* var ContainerAwareCommand. |
41
|
|
|
*/ |
42
|
|
|
protected $command; |
43
|
|
|
|
44
|
|
|
/** |
45
|
|
|
* var OutputInterface. |
46
|
|
|
*/ |
47
|
|
|
protected $output; |
48
|
|
|
|
49
|
|
|
/** |
50
|
|
|
* var \ZendQueue\Queue. |
51
|
|
|
*/ |
52
|
|
|
protected $queue; |
53
|
|
|
|
54
|
|
|
/** |
55
|
|
|
* var array. |
56
|
|
|
*/ |
57
|
|
|
protected $config; |
58
|
|
|
|
59
|
|
|
/** |
60
|
|
|
* var bool. |
61
|
|
|
*/ |
62
|
|
|
protected $running; |
63
|
|
|
|
64
|
|
|
/** |
65
|
|
|
* var integer. |
66
|
|
|
*/ |
67
|
|
|
protected $processTimeout = null; |
68
|
|
|
|
69
|
|
|
/** |
70
|
|
|
* @param LoggerInterface $logger |
71
|
|
|
* @param array $config |
72
|
|
|
*/ |
73
|
|
|
public function __construct(LoggerInterface $logger, array $config = []) |
74
|
|
|
{ |
75
|
|
|
$this->logger = $logger; |
76
|
|
|
|
77
|
|
|
$this->setUp($config); |
78
|
|
|
|
79
|
|
|
$this->running = true; |
80
|
|
|
$this->output = new ConsoleOutput(); |
81
|
|
|
|
82
|
|
|
if (php_sapi_name() == 'cli') { |
83
|
|
|
pcntl_signal(SIGTERM, function () { |
84
|
|
|
$this->stop(); |
85
|
|
|
}); |
86
|
|
|
|
87
|
|
|
pcntl_signal(SIGINT, function () { |
88
|
|
|
$this->stop(); |
89
|
|
|
}); |
90
|
|
|
} |
91
|
|
|
} |
92
|
|
|
|
93
|
|
|
public function setUp($config) |
94
|
|
|
{ |
95
|
|
|
$this->config = $config; |
96
|
|
|
|
97
|
|
|
$this->processTimeout = isset($this->config['process_timeout']) ? $this->config['process_timeout'] : null; |
98
|
|
|
} |
99
|
|
|
|
100
|
|
|
/** |
101
|
|
|
* @param string $name |
102
|
|
|
* |
103
|
|
|
* @return $this |
104
|
|
|
*/ |
105
|
|
|
public function attach($name) |
106
|
|
|
{ |
107
|
|
|
$this->queue = new \ZendQueue\Queue($this->adapter, [ |
108
|
|
|
'name' => $name, |
109
|
|
|
]); |
110
|
|
|
|
111
|
|
|
return $this; |
112
|
|
|
} |
113
|
|
|
|
114
|
|
|
/** |
115
|
|
|
* @param int $maxMessages |
116
|
|
|
* @param int $timeout |
117
|
|
|
*/ |
118
|
|
|
public function receive($maxMessages = 1, $timeout = null) |
119
|
|
|
{ |
120
|
|
|
$messages = $this->queue->receive($maxMessages, $timeout, $this->queue); |
|
|
|
|
121
|
|
|
|
122
|
|
|
if ($messages && $messages->count() > 0) { |
123
|
|
|
$this->handle($messages); |
124
|
|
|
} |
125
|
|
|
} |
126
|
|
|
|
127
|
|
|
/** |
128
|
|
|
* @param array $args |
129
|
|
|
* @param int $priority |
|
|
|
|
130
|
|
|
*/ |
131
|
|
|
public function push(array $args) |
132
|
|
|
{ |
133
|
|
|
if (!is_null($this->queue)) { |
134
|
|
|
if (class_exists('Zend\Json\Encoder')) { |
135
|
|
|
$body = \Zend\Json\Encoder::encode($args); |
136
|
|
|
} else { |
137
|
|
|
$body = json_encode($args); |
138
|
|
|
} |
139
|
|
|
|
140
|
|
|
$this->queue->send($body); |
141
|
|
|
$this->output->writeLn("<fg=green> [x] [{$this->queue->getName()}] {$args['command']} sent</>"); |
142
|
|
|
} |
143
|
|
|
} |
144
|
|
|
|
145
|
|
|
/** |
146
|
|
|
* @param string $name |
|
|
|
|
147
|
|
|
* |
148
|
|
|
* @return $this |
149
|
|
|
*/ |
150
|
|
|
public function highPriority() |
151
|
|
|
{ |
152
|
|
|
$this->adapter->setPriority(self::PRIORITY_HIGH); |
153
|
|
|
|
154
|
|
|
return $this; |
155
|
|
|
} |
156
|
|
|
|
157
|
|
|
/** |
158
|
|
|
* @param string $name |
159
|
|
|
* @param int $timeout |
160
|
|
|
*/ |
161
|
|
|
public function create($name, $timeout = null) |
162
|
|
|
{ |
163
|
|
|
return $this->adapter->create($name, $timeout); |
164
|
|
|
} |
165
|
|
|
|
166
|
|
|
/** |
167
|
|
|
* @param string $name |
168
|
|
|
* |
169
|
|
|
* @return bool |
170
|
|
|
*/ |
171
|
|
|
public function delete($name) |
172
|
|
|
{ |
173
|
|
|
return $this->adapter->delete($name); |
174
|
|
|
} |
175
|
|
|
|
176
|
|
|
/** |
177
|
|
|
* @param string $queueName |
178
|
|
|
*/ |
179
|
|
|
public function showMessages($queueName) |
180
|
|
|
{ |
181
|
|
|
return $this->adapter->showMessages($queueName); |
182
|
|
|
} |
183
|
|
|
|
184
|
|
|
/** |
185
|
|
|
* @return bool |
186
|
|
|
*/ |
187
|
|
|
public function retry() |
188
|
|
|
{ |
189
|
|
|
if (!method_exists($this->adapter, 'retry')) { |
190
|
|
|
return false; |
191
|
|
|
} |
192
|
|
|
|
193
|
|
|
return $this->adapter->retry(); |
194
|
|
|
} |
195
|
|
|
|
196
|
|
|
/** |
197
|
|
|
* @paraam int $id |
198
|
|
|
* |
199
|
|
|
* @return bool |
200
|
|
|
*/ |
201
|
|
|
public function forget($id) |
202
|
|
|
{ |
203
|
|
|
if (!method_exists($this->adapter, 'forget')) { |
204
|
|
|
return false; |
205
|
|
|
} |
206
|
|
|
|
207
|
|
|
return $this->adapter->forget($id); |
208
|
|
|
} |
209
|
|
|
|
210
|
|
|
/** |
211
|
|
|
* @return bool |
212
|
|
|
*/ |
213
|
|
|
public function flush() |
214
|
|
|
{ |
215
|
|
|
return $this->adapter->flush(); |
216
|
|
|
} |
217
|
|
|
|
218
|
|
|
/** |
219
|
|
|
* @return int |
220
|
|
|
*/ |
221
|
|
|
public function countMessages() |
222
|
|
|
{ |
223
|
|
|
return $this->adapter->count(); |
224
|
|
|
} |
225
|
|
|
|
226
|
|
|
/** |
227
|
|
|
* @return int |
228
|
|
|
*/ |
229
|
|
|
public function count() |
230
|
|
|
{ |
231
|
|
|
return $this->adapter->count(); |
232
|
|
|
} |
233
|
|
|
|
234
|
|
|
/** |
235
|
|
|
* @param ContainerAwareCommand $command |
236
|
|
|
*/ |
237
|
|
|
public function setCommand(ContainerAwareCommand $command) |
238
|
|
|
{ |
239
|
|
|
$this->command = $command; |
240
|
|
|
} |
241
|
|
|
|
242
|
|
|
/** |
243
|
|
|
* @param OutputInterface $output |
244
|
|
|
*/ |
245
|
|
|
public function setOutput(OutputInterface $output) |
246
|
|
|
{ |
247
|
|
|
$this->output = $output; |
248
|
|
|
} |
249
|
|
|
|
250
|
|
|
/** |
251
|
|
|
* @return OutputInterface |
252
|
|
|
*/ |
253
|
|
|
public function getOutput() |
254
|
|
|
{ |
255
|
|
|
return $this->output; |
256
|
|
|
} |
257
|
|
|
|
258
|
|
|
public function isEnabled() |
259
|
|
|
{ |
260
|
|
|
return $this->config['enabled']; |
261
|
|
|
} |
262
|
|
|
|
263
|
|
|
public function isRunning() |
264
|
|
|
{ |
265
|
|
|
return $this->running; |
266
|
|
|
} |
267
|
|
|
|
268
|
|
|
public function setProcessTimeout($processTimeout) |
269
|
|
|
{ |
270
|
|
|
$this->processTimeout = $processTimeout; |
271
|
|
|
} |
272
|
|
|
|
273
|
|
|
public function listen($name = null, $sleep = 0, $work = true) |
274
|
|
|
{ |
275
|
|
|
if ($work) { |
276
|
|
|
$this->loop($name); |
277
|
|
|
} else { |
278
|
|
|
// event loop |
279
|
|
|
if (class_exists('React\EventLoop\Factory')) { |
280
|
|
|
$loop = \React\EventLoop\Factory::create(); |
281
|
|
|
$loop->addPeriodicTimer($sleep, function (\React\EventLoop\Timer\Timer $timer) use ($name) { |
282
|
|
|
$this->loop($name); |
283
|
|
|
// stop closure loop on SIGINT |
284
|
|
|
if (!$this->isRunning()) { |
285
|
|
|
$timer->getLoop()->stop(); |
286
|
|
|
} |
287
|
|
|
}); |
288
|
|
|
$loop->run(); |
289
|
|
|
} else { |
290
|
|
|
do { |
291
|
|
|
$this->loop($name); |
292
|
|
|
usleep($sleep * self::MICROSECONDS_PER_SECOND); |
293
|
|
|
} while ($this->running); |
294
|
|
|
} |
295
|
|
|
} |
296
|
|
|
} |
297
|
|
|
|
298
|
|
|
protected function stop() |
299
|
|
|
{ |
300
|
|
|
$this->running = false; |
301
|
|
|
} |
302
|
|
|
|
303
|
|
|
protected function loop($name = null) |
304
|
|
|
{ |
305
|
|
|
$listQueues = []; |
306
|
|
|
|
307
|
|
|
if (php_sapi_name() == 'cli') { |
308
|
|
|
pcntl_signal_dispatch(); |
309
|
|
|
} |
310
|
|
|
if (!$this->isRunning()) { |
311
|
|
|
return; |
312
|
|
|
} |
313
|
|
|
|
314
|
|
|
if ($name) { |
315
|
|
|
$listQueues[] = $name; |
316
|
|
|
} else { |
317
|
|
|
$listQueues = $this->config['queues']; |
318
|
|
|
} |
319
|
|
|
|
320
|
|
|
foreach ($listQueues as $name) { |
321
|
|
|
$this->attach($name); |
322
|
|
|
$this->receive($this->config['max_messages']); |
323
|
|
|
|
324
|
|
|
if (php_sapi_name() == 'cli') { |
325
|
|
|
pcntl_signal_dispatch(); |
326
|
|
|
} |
327
|
|
|
if (!$this->isRunning()) { |
328
|
|
|
return; |
329
|
|
|
} |
330
|
|
|
} |
331
|
|
|
} |
332
|
|
|
|
333
|
|
|
/** |
334
|
|
|
* @param MessageIterator $messages |
335
|
|
|
*/ |
336
|
|
|
protected function handle(\ZendQueue\Message\MessageIterator $messages) |
337
|
|
|
{ |
338
|
|
|
if (!$this->output instanceof OutputInterface) { |
339
|
|
|
$this->output = new StreamOutput(fopen('php://memory', 'w', false)); |
340
|
|
|
} |
341
|
|
|
|
342
|
|
|
foreach ($messages as $message) { |
343
|
|
|
$this->run($message); |
344
|
|
|
} |
345
|
|
|
} |
346
|
|
|
|
347
|
|
|
protected function run($message) |
348
|
|
|
{ |
349
|
|
|
if (property_exists($this->adapter, 'logger')) { |
350
|
|
|
$this->adapter->logger = $this->logger; |
351
|
|
|
} |
352
|
|
|
|
353
|
|
|
try { |
354
|
|
|
list( |
355
|
|
|
$commandName, |
356
|
|
|
$arguments |
357
|
|
|
) = $this->getUnseralizedBody($message); |
358
|
|
|
|
359
|
|
|
$this->output->writeLn("<fg=yellow> [x] [{$this->queue->getName()}] {$commandName} received</> "); |
360
|
|
|
|
361
|
|
|
if (!isset($this->command)) { |
362
|
|
|
$process = new Process(sprintf('%s %s %s %s', |
363
|
|
|
'/usr/bin/php', 'app/console', $commandName, |
364
|
|
|
implode(' ', $arguments) |
365
|
|
|
)); |
366
|
|
|
$process->setTimeout($this->processTimeout); |
367
|
|
|
$process->run(); |
368
|
|
|
|
369
|
|
|
if (!$process->isSuccessful()) { |
370
|
|
|
throw new \Exception($process->getErrorOutput()); |
371
|
|
|
} |
372
|
|
|
|
373
|
|
|
echo $process->getOutput(); |
374
|
|
|
} else { |
375
|
|
|
$input = new ArrayInput(array_merge([''], $arguments)); |
376
|
|
|
$command = $this->command->getApplication()->find($commandName); |
377
|
|
|
$command->run($input, $this->output); |
378
|
|
|
} |
379
|
|
|
|
380
|
|
|
$this->queue->deleteMessage($message); |
381
|
|
|
$this->output->writeLn("<fg=green> [x] [{$this->queue->getName()}] {$commandName} done</>"); |
382
|
|
|
} catch (\Exception $e) { |
383
|
|
|
$this->output->writeLn("<fg=white;bg=red> [!] [{$this->queue->getName()}] FAILURE: {$e->getMessage()}</>"); |
384
|
|
|
$this->adapter->logException($message, $e); |
385
|
|
|
} |
386
|
|
|
} |
387
|
|
|
|
388
|
|
|
/** |
389
|
|
|
* @param ZendQueue\Message $message |
390
|
|
|
*/ |
391
|
|
|
protected function getUnseralizedBody(\ZendQueue\Message $message) |
392
|
|
|
{ |
393
|
|
|
if (class_exists('Zend\Json\Json')) { |
394
|
|
|
$body = \Zend\Json\Json::decode($message->body, true); |
395
|
|
|
} else { |
396
|
|
|
$body = json_decode($message->body, true); |
397
|
|
|
} |
398
|
|
|
|
399
|
|
|
$arguments = []; |
400
|
|
|
$args = []; |
401
|
|
|
if (isset($body['argument'])) { |
402
|
|
|
$args = $body['argument']; |
403
|
|
|
} elseif (isset($body['arguments'])) { |
404
|
|
|
$args = $body['arguments']; |
405
|
|
|
} |
406
|
|
|
|
407
|
|
|
if (!isset($body['command']) || $body['command'] == '') { |
408
|
|
|
throw new InvalidUnserializedMessageException('Command name not found'); |
409
|
|
|
} |
410
|
|
|
|
411
|
|
|
$commandName = $body['command']; |
412
|
|
|
foreach ($args as $key => $value) { |
413
|
|
|
if (is_string($key) && preg_match('/^--/', $key)) { |
414
|
|
|
if (is_bool($value)) { |
415
|
|
|
$arguments[] = "{$key}"; |
416
|
|
|
} else { |
417
|
|
|
$arguments[] = "{$key}={$value}"; |
418
|
|
|
} |
419
|
|
|
} else { |
420
|
|
|
$arguments[] = $value; |
421
|
|
|
} |
422
|
|
|
} |
423
|
|
|
|
424
|
|
|
return [ |
425
|
|
|
$commandName, |
426
|
|
|
$arguments, |
427
|
|
|
]; |
428
|
|
|
} |
429
|
|
|
} |
430
|
|
|
|
This check compares calls to functions or methods with their respective definitions. If the call has more arguments than are defined, it raises an issue.
If a function is defined several times with a different number of parameters, the check may pick up the wrong definition and report false positives. One codebase where this has been known to happen is Wordpress.
In this case you can add the
@ignore
PhpDoc annotation to the duplicate definition and it will be ignored.