Completed
Push — master ( 27a538...2907e3 )
by Alexandre
7s
created

QueueService   C

Complexity

Total Complexity 55

Size/Duplication

Total Lines 380
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 9

Importance

Changes 25
Bugs 1 Features 10
Metric Value
wmc 55
c 25
b 1
f 10
lcom 1
cbo 9
dl 0
loc 380
rs 6.8

24 Methods

Rating   Name   Duplication   Size   Complexity  
A setUp() 0 4 1
A attach() 0 8 1
A receive() 0 8 3
A push() 0 13 3
A highPriority() 0 6 1
A create() 0 4 1
A delete() 0 4 1
A showMessages() 0 4 1
A flush() 0 4 1
A countMessages() 0 4 1
A count() 0 4 1
A setCommand() 0 4 1
A setOutput() 0 4 1
A getOutput() 0 4 1
A isEnabled() 0 4 1
A isRunning() 0 4 1
A setProcessTimeout() 0 4 1
B listen() 0 24 5
A stop() 0 4 1
C loop() 0 29 7
A handle() 0 10 3
B run() 0 40 5
D getUnseralizedBody() 0 38 10
A __construct() 0 20 3

How to fix   Complexity   

Complex Class

Complex classes like QueueService often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes. You can also have a look at the cohesion graph to spot any un-connected, or weakly-connected components.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use QueueService, and based on these observations, apply Extract Interface, too.

1
<?php
2
3
/*
4
 * This file is part of HeriJobQueueBundle.
5
 *
6
 * (c) Alexandre Mogère
7
 *
8
 * This source file is subject to the MIT license that is bundled
9
 * with this source code in the file LICENSE.
10
 */
11
12
namespace Heri\Bundle\JobQueueBundle\Service;
13
14
use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand;
15
use Symfony\Component\HttpKernel\Log\LoggerInterface;
16
use Symfony\Component\Console\Input\ArrayInput;
17
use Symfony\Component\Console\Output\OutputInterface;
18
use Symfony\Component\Console\Output\ConsoleOutput;
19
use Symfony\Component\Console\Output\StreamOutput;
20
use Symfony\Component\Process\Process;
21
use Heri\Bundle\JobQueueBundle\Exception\CommandFindException;
22
use Heri\Bundle\JobQueueBundle\Exception\InvalidUnserializedMessageException;
23
24
class QueueService
25
{
26
    const MICROSECONDS_PER_SECOND = 1000000;
27
28
    const PRIORITY_HIGH = 1;
29
30
    /**
31
     * var ZendQueue\Adapter\AbstractAdapter.
32
     */
33
    public $adapter;
34
35
    /**
36
     * var LoggerInterface.
37
     */
38
    protected $logger;
39
40
    /**
41
     * var ContainerAwareCommand.
42
     */
43
    protected $command;
44
45
    /**
46
     * var OutputInterface.
47
     */
48
    protected $output;
49
50
    /**
51
     * var \ZendQueue\Queue.
52
     */
53
    protected $queue;
54
55
    /**
56
     * var array.
57
     */
58
    protected $config;
59
60
    /**
61
     * var bool.
62
     */
63
    protected $running;
64
65
    /**
66
     * var integer.
67
     */
68
    protected $processTimeout = 60;
69
70
    /**
71
     * @param LoggerInterface $logger
72
     * @param array           $config
73
     */
74
    public function __construct(LoggerInterface $logger, array $config = array())
75
    {
76
        $this->logger = $logger;
77
        $this->config = $config;
78
79
        $this->processTimeout = isset($this->config['process_timeout']) ? $this->config['process_timeout'] : null;
80
81
        $this->running = true;
82
        $this->output = new ConsoleOutput();
83
84
        if (php_sapi_name() == 'cli') {
85
            pcntl_signal(SIGTERM, function () {
86
                $this->stop();
87
            });
88
89
            pcntl_signal(SIGINT, function () {
90
                $this->stop();
91
            });
92
        }
93
    }
94
95
    public function setUp($config)
96
    {
97
        $this->config = $config;
98
    }
99
100
    /**
101
     * @param string $name
102
     *
103
     * @return $this
104
     */
105
    public function attach($name)
106
    {
107
        $this->queue = new \ZendQueue\Queue($this->adapter, [
108
            'name' => $name,
109
        ]);
110
111
        return $this;
112
    }
113
114
    /**
115
     * @param int $maxMessages
116
     * @param int $timeout
117
     */
118
    public function receive($maxMessages = 1, $timeout = null)
119
    {
120
        $messages = $this->queue->receive($maxMessages, $timeout, $this->queue);
0 ignored issues
show
Unused Code introduced by
The call to Queue::receive() has too many arguments starting with $this->queue.

This check compares calls to functions or methods with their respective definitions. If the call has more arguments than are defined, it raises an issue.

If a function is defined several times with a different number of parameters, the check may pick up the wrong definition and report false positives. One codebase where this has been known to happen is Wordpress.

In this case you can add the @ignore PhpDoc annotation to the duplicate definition and it will be ignored.

Loading history...
121
122
        if ($messages && $messages->count() > 0) {
123
            $this->handle($messages);
124
        }
125
    }
126
127
    /**
128
     * @param array $args
129
     * @param int   $priority
0 ignored issues
show
Bug introduced by
There is no parameter named $priority. Was it maybe removed?

This check looks for PHPDoc comments describing methods or function parameters that do not exist on the corresponding method or function.

Consider the following example. The parameter $italy is not defined by the method finale(...).

/**
 * @param array $germany
 * @param array $island
 * @param array $italy
 */
function finale($germany, $island) {
    return "2:1";
}

The most likely cause is that the parameter was removed, but the annotation was not.

Loading history...
130
     */
131
    public function push(array $args)
132
    {
133
        if (!is_null($this->queue)) {
134
            if (class_exists('Zend\Json\Encoder')) {
135
                $body = \Zend\Json\Encoder::encode($args);
136
            } else {
137
                $body = json_encode($args);
138
            }
139
140
            $this->queue->send($body);
141
            $this->output->writeLn("<fg=green> [x] [{$this->queue->getName()}] {$args['command']} sent</>");
142
        }
143
    }
144
145
    /**
146
     * @param string $name
0 ignored issues
show
Bug introduced by
There is no parameter named $name. Was it maybe removed?

This check looks for PHPDoc comments describing methods or function parameters that do not exist on the corresponding method or function.

Consider the following example. The parameter $italy is not defined by the method finale(...).

/**
 * @param array $germany
 * @param array $island
 * @param array $italy
 */
function finale($germany, $island) {
    return "2:1";
}

The most likely cause is that the parameter was removed, but the annotation was not.

Loading history...
147
     *
148
     * @return $this
149
     */
150
    public function highPriority()
151
    {
152
        $this->adapter->setPriority(self::PRIORITY_HIGH);
153
154
        return $this;
155
    }
156
157
    /**
158
     * @param string $name
159
     * @param int    $timeout
160
     */
161
    public function create($name, $timeout = null)
162
    {
163
        return $this->adapter->create($name, $timeout);
164
    }
165
166
    /**
167
     * @param string $name
168
     *
169
     * @return bool
170
     */
171
    public function delete($name)
172
    {
173
        return $this->adapter->delete($name);
174
    }
175
176
    /**
177
     * @param string $queueName
178
     */
179
    public function showMessages($queueName)
180
    {
181
        return $this->adapter->showMessages($queueName);
182
    }
183
184
    /**
185
     * @return bool
186
     */
187
    public function flush()
188
    {
189
        return $this->adapter->flush();
190
    }
191
192
    /**
193
     * @return int
194
     */
195
    public function countMessages()
196
    {
197
        return $this->adapter->count();
198
    }
199
200
    /**
201
     * @return int
202
     */
203
    public function count()
204
    {
205
        return $this->adapter->count();
206
    }
207
208
    /**
209
     * @param ContainerAwareCommand $command
210
     */
211
    public function setCommand(ContainerAwareCommand $command)
212
    {
213
        $this->command = $command;
214
    }
215
216
    /**
217
     * @param OutputInterface $output
218
     */
219
    public function setOutput(OutputInterface $output)
220
    {
221
        $this->output = $output;
222
    }
223
224
    /**
225
     * @return OutputInterface
226
     */
227
    public function getOutput()
228
    {
229
        return $this->output;
230
    }
231
232
    public function isEnabled()
233
    {
234
        return $this->config['enabled'];
235
    }
236
237
    public function isRunning()
238
    {
239
        return $this->running;
240
    }
241
242
    public function setProcessTimeout($processTimeout)
243
    {
244
        $this->processTimeout = $processTimeout;
245
    }
246
247
    public function listen($name = null, $sleep = 0, $work = true)
248
    {
249
        if ($work) {
250
            $this->loop($name);
251
        } else {
252
            // event loop
253
            if (class_exists('React\EventLoop\Factory')) {
254
                $loop = \React\EventLoop\Factory::create();
255
                $loop->addPeriodicTimer($sleep, function (\React\EventLoop\Timer\Timer $timer) use ($name) {
256
                    $this->loop($name);
257
                    // stop closure loop on SIGINT
258
                    if (!$this->isRunning()) {
259
                        $timer->getLoop()->stop();
260
                    }
261
                });
262
                $loop->run();
263
            } else {
264
                do {
265
                    $this->loop($name);
266
                    usleep($sleep * self::MICROSECONDS_PER_SECOND);
267
                } while ($this->running);
268
            }
269
        }
270
    }
271
272
    protected function stop()
273
    {
274
        $this->running = false;
275
    }
276
277
    protected function loop($name = null)
278
    {
279
        $listQueues = [];
280
281
        if (php_sapi_name() == 'cli') {
282
            pcntl_signal_dispatch();
283
        }
284
        if (!$this->isRunning()) {
285
            return;
286
        }
287
288
        if ($name) {
289
            $listQueues[] = $name;
290
        } else {
291
            $listQueues = $this->config['queues'];
292
        }
293
294
        foreach ($listQueues as $name) {
295
            $this->attach($name);
296
            $this->receive($this->config['max_messages']);
297
298
            if (php_sapi_name() == 'cli') {
299
                pcntl_signal_dispatch();
300
            }
301
            if (!$this->isRunning()) {
302
                return;
303
            }
304
        }
305
    }
306
307
    /**
308
     * @param MessageIterator $messages
309
     */
310
    protected function handle(\ZendQueue\Message\MessageIterator $messages)
311
    {
312
        if (!$this->output instanceof OutputInterface) {
313
            $this->output = new StreamOutput(fopen('php://memory', 'w', false));
314
        }
315
316
        foreach ($messages as $message) {
317
            $this->run($message);
318
        }
319
    }
320
321
    protected function run($message)
322
    {
323
        if (property_exists($this->adapter, 'logger')) {
324
            $this->adapter->logger = $this->logger;
325
        }
326
327
        try {
328
            list(
329
                $commandName,
330
                $arguments
331
            ) = $this->getUnseralizedBody($message);
332
333
            $this->output->writeLn("<fg=yellow> [x] [{$this->queue->getName()}] {$commandName} received</> ");
334
335
            if (!isset($this->command)) {
336
                $process = new Process(sprintf('%s %s %s %s',
337
                    '/usr/bin/php', 'app/console', $commandName,
338
                    implode(' ', $arguments)
339
                ));
340
                $process->setTimeout($this->processTimeout);
341
                $process->run();
342
343
                if (!$process->isSuccessful()) {
344
                    throw new \Exception($process->getErrorOutput());
345
                }
346
347
                print $process->getOutput();
348
            } else {
349
                $input = new ArrayInput(array_merge([''], $arguments));
350
                $command = $this->command->getApplication()->find($commandName);
351
                $command->run($input, $this->output);
352
            }
353
354
            $this->queue->deleteMessage($message);
355
            $this->output->writeLn("<fg=green> [x] [{$this->queue->getName()}] {$commandName} done</>");
356
        } catch (\Exception $e) {
357
            $this->output->writeLn("<fg=white;bg=red> [!] [{$this->queue->getName()}] FAILURE: {$e->getMessage()}</>");
358
            $this->adapter->logException($message, $e);
359
        }
360
    }
361
362
    /**
363
     * @param ZendQueue\Message $message
364
     */
365
    protected function getUnseralizedBody(\ZendQueue\Message $message)
366
    {
367
        if (class_exists('Zend\Json\Json')) {
368
            $body = \Zend\Json\Json::decode($message->body, true);
369
        } else {
370
            $body = json_decode($message->body, true);
371
        }
372
373
        $arguments = [];
374
        $args = [];
375
        if (isset($body['argument'])) {
376
            $args = $body['argument'];
377
        } elseif (isset($body['arguments'])) {
378
            $args = $body['arguments'];
379
        }
380
381
        if (!isset($body['command']) || $body['command'] == '') {
382
            throw new InvalidUnserializedMessageException('Command name not found');
383
        }
384
385
        $commandName = $body['command'];
386
        foreach ($args as $key => $value) {
387
            if (is_string($key) && preg_match('/^--/', $key)) {
388
                if (is_bool($value)) {
389
                    $arguments[] = "{$key}";
390
                } else {
391
                    $arguments[] = "{$key}={$value}";
392
                }
393
            } else {
394
                $arguments[] = $value;
395
            }
396
        }
397
398
        return [
399
            $commandName,
400
            $arguments,
401
        ];
402
    }
403
}
404