QueueConsumeCommand   A
last analyzed

Complexity

Total Complexity 5

Size/Duplication

Total Lines 61
Duplicated Lines 0 %

Coupling/Cohesion

Components 2
Dependencies 7

Test Coverage

Coverage 0%

Importance

Changes 0
Metric Value
wmc 5
lcom 2
cbo 7
dl 0
loc 61
ccs 0
cts 40
cp 0
rs 10
c 0
b 0
f 0

3 Methods

Rating   Name   Duplication   Size   Complexity  
A configure() 0 11 1
B execute() 0 32 3
A getConsumerTag() 0 4 1
1
<?php
2
3
namespace TreeHouse\QueueBundle\Command;
4
5
use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand;
6
use Symfony\Component\Console\Input\InputArgument;
7
use Symfony\Component\Console\Input\InputInterface;
8
use Symfony\Component\Console\Input\InputOption;
9
use Symfony\Component\Console\Output\OutputInterface;
10
use TreeHouse\Queue\Consumer\ConsumerInterface;
11
use TreeHouse\QueueBundle\Consumer\Consumer;
12
use TreeHouse\QueueBundle\Consumer\Limiter\MemoryLimiter;
13
use TreeHouse\QueueBundle\Consumer\Limiter\MessagesLimiter;
14
15
class QueueConsumeCommand extends ContainerAwareCommand
16
{
17
    /**
18
     * @inheritdoc
19
     */
20
    protected function configure()
21
    {
22
        $this->setName('queue:consume');
23
        $this->addArgument('queue', InputArgument::REQUIRED, 'The name of the queue to consume from');
24
        $this->addOption('batch-size', 'b', InputOption::VALUE_OPTIONAL, 'Batch size', 50);
25
        $this->addOption('limit', 'l', InputOption::VALUE_OPTIONAL, 'Maximum number of messages to consume. Set to 0 for indefinite consuming.', 0);
26
        $this->addOption('max-memory', 'm', InputOption::VALUE_OPTIONAL, 'Maximum amount of memory to use (in MB). The consumer will try to stop before this limit is reached. Set to 0 for indefinite consuming.', 0);
27
        $this->addOption('max-time', 't', InputOption::VALUE_OPTIONAL, 'Maximum execution time in seconds. Set to 0 for indefinite consuming', 0);
28
        $this->addOption('wait', 'w', InputOption::VALUE_OPTIONAL, 'Time in microseconds to wait before consuming the next message', 0);
29
        $this->addOption('min-duration', 'd', InputOption::VALUE_OPTIONAL, 'Duration that this command must run for', 15);
30
    }
31
32
    /**
33
     * @inheritdoc
34
     */
35
    protected function execute(InputInterface $input, OutputInterface $output)
36
    {
37
        $name = $input->getArgument('queue');
38
39
        /** @var ConsumerInterface $delegate */
40
        $delegate = $this->getContainer()->get(sprintf('tree_house.queue.consumer.%s', $name));
41
        $consumer = (new Consumer($delegate, $output, $this->getConsumerTag()))
42
            ->waitBetweenMessages((int) $input->getOption('wait'))
43
            ->flushAfter((int) $input->getOption('batch-size'))
44
            ->mustRunFor((int) $input->getOption('min-duration'))
45
        ;
46
47
        if ($limit = (int) $input->getOption('limit')) {
48
            $consumer->addLimiter(new MessagesLimiter($limit));
49
        }
50
51
        if ($maxMemory = (int) $input->getOption('max-memory')) {
52
            $consumer->addLimiter(new MemoryLimiter($maxMemory * 1024 * 1024));
53
        }
54
55
        $output->writeln(sprintf('Consuming from <info>%s</info> queue', $name));
56
57
        $consumer->consume();
58
59
        $output->writeln(
60
            sprintf(
61
                'Consumed <info>%d</info> messages in <info>%s seconds</info>',
62
                $consumer->getProcessed(),
63
                $consumer->getDuration()
64
            )
65
        );
66
    }
67
68
    /**
69
     * @return string
70
     */
71
    private function getConsumerTag()
72
    {
73
        return sprintf('%s-%s', $this->getName(), uniqid());
74
    }
75
}
76