1 | <?php |
||||||
2 | |||||||
3 | namespace Swarrot; |
||||||
4 | |||||||
5 | use Psr\Log\LoggerInterface; |
||||||
6 | use Psr\Log\NullLogger; |
||||||
7 | use Swarrot\Broker\MessageProvider\MessageProviderInterface; |
||||||
8 | use Swarrot\Processor\ConfigurableInterface; |
||||||
9 | use Swarrot\Processor\InitializableInterface; |
||||||
10 | use Swarrot\Processor\ProcessorInterface; |
||||||
11 | use Swarrot\Processor\SleepyInterface; |
||||||
12 | use Swarrot\Processor\TerminableInterface; |
||||||
13 | use Symfony\Component\OptionsResolver\OptionsResolver; |
||||||
14 | |||||||
15 | class Consumer |
||||||
16 | { |
||||||
17 | private $messageProvider; |
||||||
18 | private $processor; |
||||||
19 | private $optionsResolver; |
||||||
20 | private $logger; |
||||||
21 | |||||||
22 | public function __construct(MessageProviderInterface $messageProvider, ProcessorInterface $processor, OptionsResolver $optionsResolver = null, LoggerInterface $logger = null) |
||||||
23 | { |
||||||
24 | $this->messageProvider = $messageProvider; |
||||||
25 | $this->processor = $processor; |
||||||
26 | $this->optionsResolver = $optionsResolver ?: new OptionsResolver(); |
||||||
27 | $this->logger = $logger ?: new NullLogger(); |
||||||
28 | } |
||||||
29 | |||||||
30 | /** |
||||||
31 | * @param array $options Parameters sent to the processor |
||||||
32 | */ |
||||||
33 | public function consume(array $options = []): void |
||||||
34 | { |
||||||
35 | $queueName = $this->messageProvider->getQueueName(); |
||||||
36 | |||||||
37 | $this->logger->debug('Start consuming queue.', [ |
||||||
38 | 'queue' => $queueName, |
||||||
39 | ]); |
||||||
40 | |||||||
41 | $this->optionsResolver->setDefaults([ |
||||||
42 | 'poll_interval' => 50000, |
||||||
43 | 'queue' => $queueName, |
||||||
44 | ]); |
||||||
45 | |||||||
46 | if ($this->processor instanceof ConfigurableInterface) { |
||||||
47 | $this->processor->setDefaultOptions($this->optionsResolver); |
||||||
1 ignored issue
–
show
Bug
introduced
by
Loading history...
|
|||||||
48 | } |
||||||
49 | |||||||
50 | $options = $this->optionsResolver->resolve($options); |
||||||
51 | |||||||
52 | if ($this->processor instanceof InitializableInterface) { |
||||||
53 | $this->processor->initialize($options); |
||||||
1 ignored issue
–
show
The method
initialize() does not exist on Swarrot\Processor\ProcessorInterface . It seems like you code against a sub-type of Swarrot\Processor\ProcessorInterface such as Swarrot\Processor\InitializableInterface or Swarrot\Processor\Stack\StackedProcessor or Swarrot\Processor\MaxExe...xExecutionTimeProcessor or Swarrot\Processor\Signal...\SignalHandlerProcessor or Swarrot\Processor\Stack\StackedProcessor or Swarrot\Processor\MaxExe...xExecutionTimeProcessor or Swarrot\Processor\Signal...\SignalHandlerProcessor or Swarrot\Processor\Stack\StackedProcessor .
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
Loading history...
|
|||||||
54 | } |
||||||
55 | |||||||
56 | while (true) { |
||||||
57 | while (null !== $message = $this->messageProvider->get()) { |
||||||
58 | $result = $this->processor->process($message, $options); |
||||||
59 | if (!\is_bool($result)) { |
||||||
60 | @trigger_error('Processors must return a bool since Swarrot 3.7', E_USER_DEPRECATED); |
||||||
61 | } |
||||||
62 | if (false === $result) { |
||||||
63 | break 2; |
||||||
64 | } |
||||||
65 | } |
||||||
66 | |||||||
67 | if ($this->processor instanceof SleepyInterface) { |
||||||
68 | if (false === $this->processor->sleep($options)) { |
||||||
1 ignored issue
–
show
The method
sleep() does not exist on Swarrot\Processor\ProcessorInterface . It seems like you code against a sub-type of Swarrot\Processor\ProcessorInterface such as Swarrot\Processor\SleepyInterface or Swarrot\Processor\Stack\StackedProcessor or Swarrot\Processor\MaxExe...xExecutionTimeProcessor or Swarrot\Processor\Signal...\SignalHandlerProcessor or Swarrot\Processor\Stack\StackedProcessor or Swarrot\Processor\MaxExe...xExecutionTimeProcessor or Swarrot\Processor\Signal...\SignalHandlerProcessor or Swarrot\Processor\Stack\StackedProcessor .
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
Loading history...
|
|||||||
69 | break; |
||||||
70 | } |
||||||
71 | } |
||||||
72 | |||||||
73 | usleep($options['poll_interval']); |
||||||
74 | } |
||||||
75 | |||||||
76 | if ($this->processor instanceof TerminableInterface) { |
||||||
77 | $this->processor->terminate($options); |
||||||
1 ignored issue
–
show
The method
terminate() does not exist on Swarrot\Processor\ProcessorInterface . It seems like you code against a sub-type of Swarrot\Processor\ProcessorInterface such as Swarrot\Processor\TerminableInterface or Swarrot\Processor\Stack\StackedProcessor or Swarrot\Processor\Stack\StackedProcessor or Swarrot\Processor\Stack\StackedProcessor .
(
Ignorable by Annotation
)
If this is a false-positive, you can also ignore this issue in your code via the
Loading history...
|
|||||||
78 | } |
||||||
79 | } |
||||||
80 | |||||||
81 | public function getMessageProvider(): MessageProviderInterface |
||||||
82 | { |
||||||
83 | return $this->messageProvider; |
||||||
84 | } |
||||||
85 | |||||||
86 | public function setMessageProvider(MessageProviderInterface $messageProvider): self |
||||||
87 | { |
||||||
88 | $this->messageProvider = $messageProvider; |
||||||
89 | |||||||
90 | return $this; |
||||||
91 | } |
||||||
92 | |||||||
93 | public function getProcessor(): ProcessorInterface |
||||||
94 | { |
||||||
95 | return $this->processor; |
||||||
96 | } |
||||||
97 | |||||||
98 | public function setProcessor(ProcessorInterface $processor): self |
||||||
99 | { |
||||||
100 | $this->processor = $processor; |
||||||
101 | |||||||
102 | return $this; |
||||||
103 | } |
||||||
104 | |||||||
105 | public function getOptionsResolver(): OptionsResolver |
||||||
106 | { |
||||||
107 | return $this->optionsResolver; |
||||||
108 | } |
||||||
109 | |||||||
110 | public function setOptionsResolver(OptionsResolver $optionsResolver): self |
||||||
111 | { |
||||||
112 | $this->optionsResolver = $optionsResolver; |
||||||
113 | |||||||
114 | return $this; |
||||||
115 | } |
||||||
116 | } |
||||||
117 |