antidot-framework /
message-queue
| 1 | <?php |
||||
| 2 | |||||
| 3 | declare(strict_types=1); |
||||
| 4 | |||||
| 5 | namespace Antidot\Queue\Cli; |
||||
| 6 | |||||
| 7 | use Antidot\Queue\Event\QueueConsumerStarted; |
||||
| 8 | use Enqueue\Consumption\QueueConsumerInterface; |
||||
| 9 | use Interop\Queue\Context; |
||||
| 10 | use Interop\Queue\Message; |
||||
| 11 | use Interop\Queue\Processor; |
||||
| 12 | use InvalidArgumentException; |
||||
| 13 | use Psr\EventDispatcher\EventDispatcherInterface; |
||||
| 14 | use Symfony\Component\Console\Command\Command; |
||||
| 15 | use Symfony\Component\Console\Input\InputArgument; |
||||
| 16 | use Symfony\Component\Console\Input\InputInterface; |
||||
| 17 | use Symfony\Component\Console\Output\OutputInterface; |
||||
| 18 | |||||
| 19 | class StartQueueConsumer extends Command |
||||
| 20 | { |
||||
| 21 | public const NAME = 'queue:start'; |
||||
| 22 | public const INVALID_NAME_MESSAGE = 'Argument "queue_name" must be of type string.'; |
||||
| 23 | private QueueConsumerInterface $consumer; |
||||
| 24 | private Processor $processor; |
||||
| 25 | private Context $context; |
||||
| 26 | private EventDispatcherInterface $eventDispatcher; |
||||
| 27 | |||||
| 28 | 4 | public function __construct( |
|||
| 29 | QueueConsumerInterface $consumer, |
||||
| 30 | Processor $messageProcessor, |
||||
| 31 | Context $context, |
||||
| 32 | EventDispatcherInterface $eventDispatcher |
||||
| 33 | ) { |
||||
| 34 | 4 | $this->consumer = $consumer; |
|||
| 35 | 4 | $this->processor = $messageProcessor; |
|||
| 36 | 4 | $this->context = $context; |
|||
| 37 | 4 | $this->eventDispatcher = $eventDispatcher; |
|||
| 38 | 4 | parent::__construct(); |
|||
| 39 | 4 | } |
|||
| 40 | |||||
| 41 | 4 | protected function configure(): void |
|||
| 42 | { |
||||
| 43 | 4 | $this->setName(self::NAME) |
|||
| 44 | 4 | ->setDescription('Start listening to the given queue name.') |
|||
| 45 | 4 | ->addArgument( |
|||
| 46 | 4 | 'queue_name', |
|||
| 47 | 4 | InputArgument::REQUIRED, |
|||
| 48 | 4 | 'The queue name we want to consume' |
|||
| 49 | ); |
||||
| 50 | 4 | } |
|||
| 51 | |||||
| 52 | 2 | protected function execute(InputInterface $input, OutputInterface $output): int |
|||
| 53 | { |
||||
| 54 | 2 | $queue = $input->getArgument('queue_name'); |
|||
| 55 | 2 | if (false === is_string($queue)) { |
|||
| 56 | 1 | throw new InvalidArgumentException(self::INVALID_NAME_MESSAGE); |
|||
| 57 | } |
||||
| 58 | 1 | $this->eventDispatcher->dispatch(QueueConsumerStarted::occur($queue)); |
|||
|
0 ignored issues
–
show
Bug
introduced
by
Loading history...
|
|||||
| 59 | 1 | $processor = $this->processor; |
|||
| 60 | 1 | $context = $this->context; |
|||
| 61 | 1 | $callback = static fn(Message $message) => $processor->process($message, $context); |
|||
| 62 | 1 | $this->consumer->bindCallback($queue, $callback); |
|||
|
0 ignored issues
–
show
It seems like
$queue can also be of type string[]; however, parameter $queueName of Enqueue\Consumption\Queu...terface::bindCallback() does only seem to accept Interop\Queue\Queue|string, maybe add an additional type check?
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
Loading history...
|
|||||
| 63 | 1 | $this->consumer->consume(); |
|||
| 64 | |||||
| 65 | 1 | return 0; |
|||
| 66 | } |
||||
| 67 | } |
||||
| 68 |