1 | <?php |
||
2 | |||
3 | namespace OldSound\RabbitMqBundle\Command; |
||
4 | |||
5 | use OldSound\RabbitMqBundle\RabbitMq\BatchConsumer; |
||
6 | use PhpAmqpLib\Exception\AMQPTimeoutException; |
||
7 | use Symfony\Component\Console\Input\InputArgument; |
||
8 | use Symfony\Component\Console\Input\InputInterface; |
||
9 | use Symfony\Component\Console\Input\InputOption; |
||
10 | use Symfony\Component\Console\Output\OutputInterface; |
||
11 | |||
12 | final class BatchConsumerCommand extends BaseRabbitMqCommand |
||
13 | { |
||
14 | /** |
||
15 | * @var BatchConsumer |
||
16 | */ |
||
17 | protected $consumer; |
||
18 | |||
19 | public function stopConsumer() |
||
20 | { |
||
21 | if ($this->consumer instanceof BatchConsumer) { |
||
22 | // Process current message, then halt consumer |
||
23 | $this->consumer->forceStopConsumer(); |
||
24 | |||
25 | // Halt consumer if waiting for a new message from the queue |
||
26 | try { |
||
27 | $this->consumer->stopConsuming(); |
||
28 | } catch (AMQPTimeoutException $e) {} |
||
0 ignored issues
–
show
Coding Style
Comprehensibility
introduced
by
![]() |
|||
29 | } |
||
30 | } |
||
31 | |||
32 | protected function configure() |
||
33 | { |
||
34 | parent::configure(); |
||
35 | |||
36 | $this |
||
37 | ->setName('rabbitmq:batch:consumer') |
||
38 | ->addArgument('name', InputArgument::REQUIRED, 'Consumer Name') |
||
39 | ->addOption('batches', 'b', InputOption::VALUE_OPTIONAL, 'Number of batches to consume', 0) |
||
40 | ->addOption('route', 'r', InputOption::VALUE_OPTIONAL, 'Routing Key', '') |
||
41 | ->addOption('memory-limit', 'l', InputOption::VALUE_OPTIONAL, 'Allowed memory for this process', null) |
||
42 | ->addOption('debug', 'd', InputOption::VALUE_NONE, 'Enable Debugging') |
||
43 | ->addOption('without-signals', 'w', InputOption::VALUE_NONE, 'Disable catching of system signals') |
||
44 | ->setDescription('Executes a Batch Consumer'); |
||
45 | ; |
||
46 | } |
||
47 | |||
48 | /** |
||
49 | * Executes the current command. |
||
50 | * |
||
51 | * @param InputInterface $input An InputInterface instance |
||
52 | * @param OutputInterface $output An OutputInterface instance |
||
53 | * |
||
54 | * @return integer 0 if everything went fine, or an error code |
||
55 | * |
||
56 | * @throws \InvalidArgumentException When the number of batches to consume is less than 0 |
||
57 | * @throws \BadFunctionCallException When the pcntl is not installed and option -s is true |
||
58 | */ |
||
59 | protected function execute(InputInterface $input, OutputInterface $output) |
||
60 | { |
||
61 | if (defined('AMQP_WITHOUT_SIGNALS') === false) { |
||
62 | define('AMQP_WITHOUT_SIGNALS', $input->getOption('without-signals')); |
||
63 | } |
||
64 | |||
65 | if (!AMQP_WITHOUT_SIGNALS && extension_loaded('pcntl')) { |
||
66 | if (!function_exists('pcntl_signal')) { |
||
67 | throw new \BadFunctionCallException("Function 'pcntl_signal' is referenced in the php.ini 'disable_functions' and can't be called."); |
||
68 | } |
||
69 | |||
70 | pcntl_signal(SIGTERM, array(&$this, 'stopConsumer')); |
||
71 | pcntl_signal(SIGINT, array(&$this, 'stopConsumer')); |
||
72 | } |
||
73 | |||
74 | if (defined('AMQP_DEBUG') === false) { |
||
75 | define('AMQP_DEBUG', (bool) $input->getOption('debug')); |
||
76 | } |
||
77 | |||
78 | $batchAmountTarget = (int) $input->getOption('batches'); |
||
79 | |||
80 | if (0 > $batchAmountTarget) { |
||
81 | throw new \InvalidArgumentException("The -b option should be greater than 0"); |
||
82 | } |
||
83 | |||
84 | $this->initConsumer($input); |
||
85 | |||
86 | return $this->consumer->consume($batchAmountTarget); |
||
87 | } |
||
88 | |||
89 | /** |
||
90 | * @param InputInterface $input |
||
91 | */ |
||
92 | protected function initConsumer(InputInterface $input) |
||
93 | { |
||
94 | $this->consumer = $this->getContainer() |
||
95 | ->get(sprintf($this->getConsumerService(), $input->getArgument('name'))); |
||
96 | |||
97 | if (null !== $input->getOption('memory-limit') && |
||
98 | ctype_digit((string) $input->getOption('memory-limit')) && |
||
99 | (int) $input->getOption('memory-limit') > 0 |
||
100 | ) { |
||
101 | $this->consumer->setMemoryLimit($input->getOption('memory-limit')); |
||
102 | } |
||
103 | $this->consumer->setRoutingKey($input->getOption('route')); |
||
104 | } |
||
105 | |||
106 | /** |
||
107 | * @return string |
||
108 | */ |
||
109 | protected function getConsumerService() |
||
110 | { |
||
111 | return 'old_sound_rabbit_mq.%s_batch'; |
||
112 | } |
||
113 | } |
||
114 |