Completed
Push — master ( 6b91a0...a7e12a )
by Keith
07:17 queued 05:40
created

QueueReceiveCommand::pollQueue()   B

Complexity

Conditions 4
Paths 3

Size

Total Lines 23
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 20

Importance

Changes 1
Bugs 0 Features 0
Metric Value
dl 0
loc 23
ccs 0
cts 19
cp 0
rs 8.7972
c 1
b 0
f 0
cc 4
eloc 13
nc 3
nop 2
crap 20
1
<?php
2
3
/**
4
 * Copyright 2014 Underground Elephant
5
 *
6
 * Licensed under the Apache License, Version 2.0 (the "License");
7
 * you may not use this file except in compliance with the License.
8
 * You may obtain a copy of the License at
9
 *
10
 *     http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing, software
13
 * distributed under the License is distributed on an "AS IS" BASIS,
14
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
 * See the License for the specific language governing permissions and
16
 * limitations under the License.
17
 *
18
 * @package     qpush-bundle
19
 * @copyright   Underground Elephant 2014
20
 * @license     Apache License, Version 2.0
21
 */
22
23
namespace Uecode\Bundle\QPushBundle\Command;
24
25
use Symfony\Component\Console\Command\Command;
26
use Symfony\Component\Console\Input\InputArgument;
27
use Symfony\Component\Console\Input\InputInterface;
28
use Symfony\Component\Console\Output\OutputInterface;
29
use Symfony\Component\DependencyInjection\ContainerAwareInterface;
30
use Symfony\Component\DependencyInjection\ContainerInterface;
31
use Uecode\Bundle\QPushBundle\Event\Events;
32
use Uecode\Bundle\QPushBundle\Event\MessageEvent;
33
34
/**
35
 * @author Keith Kirk <[email protected]>
36
 */
37
class QueueReceiveCommand extends Command implements ContainerAwareInterface
38
{
39
    /**
40
     * @var ContainerInterface
41
     *
42
     * @api
43
     */
44
    protected $container;
45
46
    /**
47
     * Sets the Container associated with this Controller.
48
     *
49
     * @param ContainerInterface $container A ContainerInterface instance
50
     *
51
     * @api
52
     */
53
    public function setContainer(ContainerInterface $container = null)
54
    {
55
        $this->container = $container;
56
    }
57
58
    protected $output;
59
60
    protected function configure()
61
    {
62
        $this
63
            ->setName('uecode:qpush:receive')
64
            ->setDescription('Polls the configured Queues')
65
            ->addArgument(
66
                'name',
67
                InputArgument::OPTIONAL,
68
                'Name of a specific queue to poll',
69
                null
70
            )
71
        ;
72
    }
73
74
    protected function execute(InputInterface $input, OutputInterface $output)
75
    {
76
        $this->output = $output;
77
        $registry = $this->container->get('uecode_qpush');
78
79
        $name = $input->getArgument('name');
80
81
        if (null !== $name) {
82
            return $this->pollQueue($registry, $name);
83
        }
84
85
        foreach ($registry->all() as $queue) {
86
            $this->pollQueue($registry, $queue->getName());
87
        }
88
89
        return 0;
90
    }
91
92
    private function pollQueue($registry, $name)
93
    {
94
        if (!$registry->has($name)) {
95
            return $this->output->writeln(
96
                sprintf("The [%s] queue you have specified does not exists!", $name)
97
            );
98
        }
99
100
        $dispatcher = $this->container->get('event_dispatcher');
101
        $messages   = $registry->get($name)->receive();
102
103
        if($messages) {
104
            foreach ($messages as $message) {
105
                $messageEvent = new MessageEvent($name, $message);
106
                $dispatcher->dispatch(Events::Message($name), $messageEvent);
107
            }
108
        }
109
110
        $msg = "<info>Finished polling %s Queue, %d messages fetched.</info>";
111
        $this->output->writeln(sprintf($msg, $name, sizeof($messages)));
112
113
        return 0;
114
    }
115
}
116