1 | <?php |
||||
2 | |||||
3 | declare(strict_types=1); |
||||
4 | |||||
5 | namespace Antidot\Queue\Cli; |
||||
6 | |||||
7 | use Antidot\Queue\Event\QueueConsumerStarted; |
||||
8 | use Enqueue\Consumption\QueueConsumerInterface; |
||||
9 | use Interop\Queue\Context; |
||||
10 | use Interop\Queue\Message; |
||||
11 | use Interop\Queue\Processor; |
||||
12 | use InvalidArgumentException; |
||||
13 | use Psr\EventDispatcher\EventDispatcherInterface; |
||||
14 | use Symfony\Component\Console\Command\Command; |
||||
15 | use Symfony\Component\Console\Input\InputArgument; |
||||
16 | use Symfony\Component\Console\Input\InputInterface; |
||||
17 | use Symfony\Component\Console\Output\OutputInterface; |
||||
18 | |||||
19 | class StartQueueConsumer extends Command |
||||
20 | { |
||||
21 | public const NAME = 'queue:start'; |
||||
22 | public const INVALID_NAME_MESSAGE = 'Argument "queue_name" must be of type string.'; |
||||
23 | private QueueConsumerInterface $consumer; |
||||
24 | private Processor $processor; |
||||
25 | private Context $context; |
||||
26 | private EventDispatcherInterface $eventDispatcher; |
||||
27 | |||||
28 | 4 | public function __construct( |
|||
29 | QueueConsumerInterface $consumer, |
||||
30 | Processor $messageProcessor, |
||||
31 | Context $context, |
||||
32 | EventDispatcherInterface $eventDispatcher |
||||
33 | ) { |
||||
34 | 4 | $this->consumer = $consumer; |
|||
35 | 4 | $this->processor = $messageProcessor; |
|||
36 | 4 | $this->context = $context; |
|||
37 | 4 | $this->eventDispatcher = $eventDispatcher; |
|||
38 | 4 | parent::__construct(); |
|||
39 | 4 | } |
|||
40 | |||||
41 | 4 | protected function configure(): void |
|||
42 | { |
||||
43 | 4 | $this->setName(self::NAME) |
|||
44 | 4 | ->setDescription('Start listening to the given queue name.') |
|||
45 | 4 | ->addArgument( |
|||
46 | 4 | 'queue_name', |
|||
47 | 4 | InputArgument::REQUIRED, |
|||
48 | 4 | 'The queue name we want to consume' |
|||
49 | ); |
||||
50 | 4 | } |
|||
51 | |||||
52 | 2 | protected function execute(InputInterface $input, OutputInterface $output): int |
|||
53 | { |
||||
54 | 2 | $queue = $input->getArgument('queue_name'); |
|||
55 | 2 | if (false === is_string($queue)) { |
|||
56 | 1 | throw new InvalidArgumentException(self::INVALID_NAME_MESSAGE); |
|||
57 | } |
||||
58 | 1 | $this->eventDispatcher->dispatch(QueueConsumerStarted::occur($queue)); |
|||
0 ignored issues
–
show
Bug
introduced
by
![]() |
|||||
59 | 1 | $processor = $this->processor; |
|||
60 | 1 | $context = $this->context; |
|||
61 | 1 | $callback = static fn(Message $message) => $processor->process($message, $context); |
|||
62 | 1 | $this->consumer->bindCallback($queue, $callback); |
|||
0 ignored issues
–
show
It seems like
$queue can also be of type string[] ; however, parameter $queueName of Enqueue\Consumption\Queu...terface::bindCallback() does only seem to accept Interop\Queue\Queue|string , maybe add an additional type check?
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
![]() |
|||||
63 | 1 | $this->consumer->consume(); |
|||
64 | |||||
65 | 1 | return 0; |
|||
66 | } |
||||
67 | } |
||||
68 |