Completed
Push — master ( d39b35...b61033 )
by Alexandre
02:49
created

QueueService::push()   A

Complexity

Conditions 3
Paths 3

Size

Total Lines 13
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Importance

Changes 5
Bugs 0 Features 3
Metric Value
c 5
b 0
f 3
dl 0
loc 13
rs 9.4285
cc 3
eloc 8
nc 3
nop 1
1
<?php
2
3
/**
4
 * This file is part of HeriJobQueueBundle.
5
 *
6
 * (c) Alexandre Mogère
7
 *
8
 * This source file is subject to the MIT license that is bundled
9
 * with this source code in the file LICENSE.
10
 */
11
12
namespace Heri\Bundle\JobQueueBundle\Service;
13
14
use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand;
15
use Symfony\Component\Console\Input\ArrayInput;
16
use Symfony\Component\Console\Output\OutputInterface;
17
use Symfony\Component\Console\Output\ConsoleOutput;
18
use Symfony\Component\Console\Output\StreamOutput;
19
use Symfony\Component\Process\Process;
20
use Psr\Log\LoggerInterface;
21
use Heri\Bundle\JobQueueBundle\Exception\InvalidUnserializedMessageException;
22
23
class QueueService
24
{
25
    const MICROSECONDS_PER_SECOND = 1000000;
26
27
    const PRIORITY_HIGH = 1;
28
29
    /**
30
     * var ZendQueue\Adapter\AbstractAdapter.
31
     */
32
    public $adapter;
33
34
    /**
35
     * var LoggerInterface.
36
     */
37
    protected $logger;
38
39
    /**
40
     * var ContainerAwareCommand.
41
     */
42
    protected $command;
43
44
    /**
45
     * var OutputInterface.
46
     */
47
    protected $output;
48
49
    /**
50
     * var \ZendQueue\Queue.
51
     */
52
    protected $queue;
53
54
    /**
55
     * var array.
56
     */
57
    protected $config;
58
59
    /**
60
     * var bool.
61
     */
62
    protected $running;
63
64
    /**
65
     * var integer.
66
     */
67
    protected $processTimeout = null;
68
69
    /**
70
     * @param LoggerInterface $logger
71
     * @param array           $config
72
     */
73
    public function __construct(LoggerInterface $logger, array $config = [])
74
    {
75
        $this->logger = $logger;
76
77
        $this->setUp($config);
78
79
        $this->running = true;
80
        $this->output = new ConsoleOutput();
81
82
        if (php_sapi_name() == 'cli') {
83
            pcntl_signal(SIGTERM, function () {
84
                $this->stop();
85
            });
86
87
            pcntl_signal(SIGINT, function () {
88
                $this->stop();
89
            });
90
        }
91
    }
92
93
    public function setUp($config)
94
    {
95
        $this->config = $config;
96
97
        $this->processTimeout = isset($this->config['process_timeout']) ? $this->config['process_timeout'] : null;
98
    }
99
100
    /**
101
     * @param string $name
102
     *
103
     * @return $this
104
     */
105
    public function attach($name)
106
    {
107
        $this->queue = new \ZendQueue\Queue($this->adapter, [
108
            'name' => $name,
109
        ]);
110
111
        return $this;
112
    }
113
114
    /**
115
     * @param int $maxMessages
116
     * @param int $timeout
117
     */
118
    public function receive($maxMessages = 1, $timeout = null)
119
    {
120
        $messages = $this->queue->receive($maxMessages, $timeout, $this->queue);
0 ignored issues
show
Unused Code introduced by
The call to Queue::receive() has too many arguments starting with $this->queue.

This check compares calls to functions or methods with their respective definitions. If the call has more arguments than are defined, it raises an issue.

If a function is defined several times with a different number of parameters, the check may pick up the wrong definition and report false positives. One codebase where this has been known to happen is Wordpress.

In this case you can add the @ignore PhpDoc annotation to the duplicate definition and it will be ignored.

Loading history...
121
122
        if ($messages && $messages->count() > 0) {
123
            $this->handle($messages);
124
        }
125
    }
126
127
    /**
128
     * @param array $args
129
     * @param int   $priority
0 ignored issues
show
Bug introduced by
There is no parameter named $priority. Was it maybe removed?

This check looks for PHPDoc comments describing methods or function parameters that do not exist on the corresponding method or function.

Consider the following example. The parameter $italy is not defined by the method finale(...).

/**
 * @param array $germany
 * @param array $island
 * @param array $italy
 */
function finale($germany, $island) {
    return "2:1";
}

The most likely cause is that the parameter was removed, but the annotation was not.

Loading history...
130
     */
131
    public function push(array $args)
132
    {
133
        if (!is_null($this->queue)) {
134
            if (class_exists('Zend\Json\Encoder')) {
135
                $body = \Zend\Json\Encoder::encode($args);
136
            } else {
137
                $body = json_encode($args);
138
            }
139
140
            $this->queue->send($body);
141
            $this->output->writeLn("<fg=green> [x] [{$this->queue->getName()}] {$args['command']} sent</>");
142
        }
143
    }
144
145
    /**
146
     * @param string $name
0 ignored issues
show
Bug introduced by
There is no parameter named $name. Was it maybe removed?

This check looks for PHPDoc comments describing methods or function parameters that do not exist on the corresponding method or function.

Consider the following example. The parameter $italy is not defined by the method finale(...).

/**
 * @param array $germany
 * @param array $island
 * @param array $italy
 */
function finale($germany, $island) {
    return "2:1";
}

The most likely cause is that the parameter was removed, but the annotation was not.

Loading history...
147
     *
148
     * @return $this
149
     */
150
    public function highPriority()
151
    {
152
        $this->adapter->setPriority(self::PRIORITY_HIGH);
153
154
        return $this;
155
    }
156
157
    /**
158
     * @param string $name
159
     * @param int    $timeout
160
     */
161
    public function create($name, $timeout = null)
162
    {
163
        return $this->adapter->create($name, $timeout);
164
    }
165
166
    /**
167
     * @param string $name
168
     *
169
     * @return bool
170
     */
171
    public function delete($name)
172
    {
173
        return $this->adapter->delete($name);
174
    }
175
176
    /**
177
     * @param string $queueName
178
     */
179
    public function showMessages($queueName)
180
    {
181
        return $this->adapter->showMessages($queueName);
182
    }
183
184
    /**
185
     * @return bool
186
     */
187
    public function retry()
188
    {
189
        if (!method_exists($this->adapter, 'retry')) {
190
            return false;
191
        }
192
193
        return $this->adapter->retry();
194
    }
195
196
    /**
197
     * @paraam int $id 
198
     *
199
     * @return bool
200
     */
201
    public function forget($id)
202
    {
203
        if (!method_exists($this->adapter, 'forget')) {
204
            return false;
205
        }
206
207
        return $this->adapter->forget($id);
208
    }
209
210
    /**
211
     * @return bool
212
     */
213
    public function flush()
214
    {
215
        return $this->adapter->flush();
216
    }
217
218
    /**
219
     * @return int
220
     */
221
    public function countMessages()
222
    {
223
        return $this->adapter->count();
224
    }
225
226
    /**
227
     * @return int
228
     */
229
    public function count()
230
    {
231
        return $this->adapter->count();
232
    }
233
234
    /**
235
     * @param ContainerAwareCommand $command
236
     */
237
    public function setCommand(ContainerAwareCommand $command)
238
    {
239
        $this->command = $command;
240
    }
241
242
    /**
243
     * @param OutputInterface $output
244
     */
245
    public function setOutput(OutputInterface $output)
246
    {
247
        $this->output = $output;
248
    }
249
250
    /**
251
     * @return OutputInterface
252
     */
253
    public function getOutput()
254
    {
255
        return $this->output;
256
    }
257
258
    public function isEnabled()
259
    {
260
        return $this->config['enabled'];
261
    }
262
263
    public function isRunning()
264
    {
265
        return $this->running;
266
    }
267
268
    public function setProcessTimeout($processTimeout)
269
    {
270
        $this->processTimeout = $processTimeout;
271
    }
272
273
    public function listen($name = null, $sleep = 0, $work = true)
274
    {
275
        if ($work) {
276
            $this->loop($name);
277
        } else {
278
            // event loop
279
            if (class_exists('React\EventLoop\Factory')) {
280
                $loop = \React\EventLoop\Factory::create();
281
                $loop->addPeriodicTimer($sleep, function (\React\EventLoop\Timer\Timer $timer) use ($name) {
282
                    $this->loop($name);
283
                    // stop closure loop on SIGINT
284
                    if (!$this->isRunning()) {
285
                        $timer->getLoop()->stop();
286
                    }
287
                });
288
                $loop->run();
289
            } else {
290
                do {
291
                    $this->loop($name);
292
                    usleep($sleep * self::MICROSECONDS_PER_SECOND);
293
                } while ($this->running);
294
            }
295
        }
296
    }
297
298
    protected function stop()
299
    {
300
        $this->running = false;
301
    }
302
303
    protected function loop($name = null)
304
    {
305
        $listQueues = [];
306
307
        if (php_sapi_name() == 'cli') {
308
            pcntl_signal_dispatch();
309
        }
310
        if (!$this->isRunning()) {
311
            return;
312
        }
313
314
        if ($name) {
315
            $listQueues[] = $name;
316
        } else {
317
            $listQueues = $this->config['queues'];
318
        }
319
320
        foreach ($listQueues as $name) {
321
            $this->attach($name);
322
            $this->receive($this->config['max_messages']);
323
324
            if (php_sapi_name() == 'cli') {
325
                pcntl_signal_dispatch();
326
            }
327
            if (!$this->isRunning()) {
328
                return;
329
            }
330
        }
331
    }
332
333
    /**
334
     * @param MessageIterator $messages
335
     */
336
    protected function handle(\ZendQueue\Message\MessageIterator $messages)
337
    {
338
        if (!$this->output instanceof OutputInterface) {
339
            $this->output = new StreamOutput(fopen('php://memory', 'w', false));
340
        }
341
342
        foreach ($messages as $message) {
343
            $this->run($message);
344
        }
345
    }
346
347
    protected function run($message)
348
    {
349
        if (property_exists($this->adapter, 'logger')) {
350
            $this->adapter->logger = $this->logger;
351
        }
352
353
        try {
354
            list(
355
                $commandName,
356
                $arguments
357
            ) = $this->getUnseralizedBody($message);
358
359
            $this->output->writeLn("<fg=yellow> [x] [{$this->queue->getName()}] {$commandName} received</> ");
360
361
            if (!isset($this->command)) {
362
                $process = new Process(sprintf('%s %s %s %s',
363
                    '/usr/bin/php', 'app/console', $commandName,
364
                    implode(' ', $arguments)
365
                ));
366
                $process->setTimeout($this->processTimeout);
367
                $process->run();
368
369
                if (!$process->isSuccessful()) {
370
                    throw new \Exception($process->getErrorOutput());
371
                }
372
373
                echo $process->getOutput();
374
            } else {
375
                $input = new ArrayInput(array_merge([''], $arguments));
376
                $command = $this->command->getApplication()->find($commandName);
377
                $command->run($input, $this->output);
378
            }
379
380
            $this->queue->deleteMessage($message);
381
            $this->output->writeLn("<fg=green> [x] [{$this->queue->getName()}] {$commandName} done</>");
382
        } catch (\Exception $e) {
383
            $this->output->writeLn("<fg=white;bg=red> [!] [{$this->queue->getName()}] FAILURE: {$e->getMessage()}</>");
384
            $this->adapter->logException($message, $e);
385
        }
386
    }
387
388
    /**
389
     * @param ZendQueue\Message $message
390
     */
391
    protected function getUnseralizedBody(\ZendQueue\Message $message)
392
    {
393
        if (class_exists('Zend\Json\Json')) {
394
            $body = \Zend\Json\Json::decode($message->body, true);
395
        } else {
396
            $body = json_decode($message->body, true);
397
        }
398
399
        $arguments = [];
400
        $args = [];
401
        if (isset($body['argument'])) {
402
            $args = $body['argument'];
403
        } elseif (isset($body['arguments'])) {
404
            $args = $body['arguments'];
405
        }
406
407
        if (!isset($body['command']) || $body['command'] == '') {
408
            throw new InvalidUnserializedMessageException('Command name not found');
409
        }
410
411
        $commandName = $body['command'];
412
        foreach ($args as $key => $value) {
413
            if (is_string($key) && preg_match('/^--/', $key)) {
414
                if (is_bool($value)) {
415
                    $arguments[] = "{$key}";
416
                } else {
417
                    $arguments[] = "{$key}={$value}";
418
                }
419
            } else {
420
                $arguments[] = $value;
421
            }
422
        }
423
424
        return [
425
            $commandName,
426
            $arguments,
427
        ];
428
    }
429
}
430