Completed
Push — master ( 53783e...ee648a )
by Olivier
22s queued 20s
created

src/Swarrot/Consumer.php (4 issues)

Labels
Severity
1
<?php
2
3
namespace Swarrot;
4
5
use Psr\Log\LoggerInterface;
6
use Psr\Log\NullLogger;
7
use Swarrot\Broker\MessageProvider\MessageProviderInterface;
8
use Swarrot\Processor\ConfigurableInterface;
9
use Swarrot\Processor\InitializableInterface;
10
use Swarrot\Processor\ProcessorInterface;
11
use Swarrot\Processor\SleepyInterface;
12
use Swarrot\Processor\TerminableInterface;
13
use Symfony\Component\OptionsResolver\OptionsResolver;
14
15
class Consumer
16
{
17
    private $messageProvider;
18
    private $processor;
19
    private $optionsResolver;
20
    private $logger;
21
22
    public function __construct(MessageProviderInterface $messageProvider, ProcessorInterface $processor, OptionsResolver $optionsResolver = null, LoggerInterface $logger = null)
23
    {
24
        $this->messageProvider = $messageProvider;
25
        $this->processor = $processor;
26
        $this->optionsResolver = $optionsResolver ?: new OptionsResolver();
27
        $this->logger = $logger ?: new NullLogger();
28
    }
29
30
    /**
31
     * @param array $options Parameters sent to the processor
32
     */
33
    public function consume(array $options = []): void
34
    {
35
        $queueName = $this->messageProvider->getQueueName();
36
37
        $this->logger->debug('Start consuming queue.', [
38
            'queue' => $queueName,
39
        ]);
40
41
        $this->optionsResolver->setDefaults([
42
            'poll_interval' => 50000,
43
            'queue' => $queueName,
44
        ]);
45
46
        if ($this->processor instanceof ConfigurableInterface) {
47
            $this->processor->setDefaultOptions($this->optionsResolver);
1 ignored issue
show
The method setDefaultOptions() does not exist on Swarrot\Processor\ProcessorInterface. It seems like you code against a sub-type of Swarrot\Processor\ProcessorInterface such as Swarrot\Processor\ConfigurableInterface or Swarrot\Processor\Stack\StackedProcessor or Swarrot\Processor\MaxExe...xExecutionTimeProcessor or Swarrot\Processor\Signal...\SignalHandlerProcessor or Swarrot\Processor\Stack\StackedProcessor or Swarrot\Processor\MaxExe...xExecutionTimeProcessor or Swarrot\Processor\Signal...\SignalHandlerProcessor or Swarrot\Processor\Stack\StackedProcessor. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

47
            $this->processor->/** @scrutinizer ignore-call */ 
48
                              setDefaultOptions($this->optionsResolver);
Loading history...
48
        }
49
50
        $options = $this->optionsResolver->resolve($options);
51
52
        if ($this->processor instanceof InitializableInterface) {
53
            $this->processor->initialize($options);
1 ignored issue
show
The method initialize() does not exist on Swarrot\Processor\ProcessorInterface. It seems like you code against a sub-type of Swarrot\Processor\ProcessorInterface such as Swarrot\Processor\InitializableInterface or Swarrot\Processor\Stack\StackedProcessor or Swarrot\Processor\MaxExe...xExecutionTimeProcessor or Swarrot\Processor\Signal...\SignalHandlerProcessor or Swarrot\Processor\Stack\StackedProcessor or Swarrot\Processor\MaxExe...xExecutionTimeProcessor or Swarrot\Processor\Signal...\SignalHandlerProcessor or Swarrot\Processor\Stack\StackedProcessor. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

53
            $this->processor->/** @scrutinizer ignore-call */ 
54
                              initialize($options);
Loading history...
54
        }
55
56
        while (true) {
57
            while (null !== $message = $this->messageProvider->get()) {
58
                $result = $this->processor->process($message, $options);
59
                if (!\is_bool($result)) {
60
                    @trigger_error('Processors must return a bool since Swarrot 3.7', E_USER_DEPRECATED);
61
                }
62
                if (false === $result) {
63
                    break 2;
64
                }
65
            }
66
67
            if ($this->processor instanceof SleepyInterface) {
68
                if (false === $this->processor->sleep($options)) {
1 ignored issue
show
The method sleep() does not exist on Swarrot\Processor\ProcessorInterface. It seems like you code against a sub-type of Swarrot\Processor\ProcessorInterface such as Swarrot\Processor\SleepyInterface or Swarrot\Processor\Stack\StackedProcessor or Swarrot\Processor\MaxExe...xExecutionTimeProcessor or Swarrot\Processor\Signal...\SignalHandlerProcessor or Swarrot\Processor\Stack\StackedProcessor or Swarrot\Processor\MaxExe...xExecutionTimeProcessor or Swarrot\Processor\Signal...\SignalHandlerProcessor or Swarrot\Processor\Stack\StackedProcessor. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

68
                if (false === $this->processor->/** @scrutinizer ignore-call */ sleep($options)) {
Loading history...
69
                    break;
70
                }
71
            }
72
73
            usleep($options['poll_interval']);
74
        }
75
76
        if ($this->processor instanceof TerminableInterface) {
77
            $this->processor->terminate($options);
1 ignored issue
show
The method terminate() does not exist on Swarrot\Processor\ProcessorInterface. It seems like you code against a sub-type of Swarrot\Processor\ProcessorInterface such as Swarrot\Processor\TerminableInterface or Swarrot\Processor\Stack\StackedProcessor or Swarrot\Processor\Stack\StackedProcessor or Swarrot\Processor\Stack\StackedProcessor. ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-call  annotation

77
            $this->processor->/** @scrutinizer ignore-call */ 
78
                              terminate($options);
Loading history...
78
        }
79
    }
80
81
    public function getMessageProvider(): MessageProviderInterface
82
    {
83
        return $this->messageProvider;
84
    }
85
86
    public function setMessageProvider(MessageProviderInterface $messageProvider): self
87
    {
88
        $this->messageProvider = $messageProvider;
89
90
        return $this;
91
    }
92
93
    public function getProcessor(): ProcessorInterface
94
    {
95
        return $this->processor;
96
    }
97
98
    public function setProcessor(ProcessorInterface $processor): self
99
    {
100
        $this->processor = $processor;
101
102
        return $this;
103
    }
104
105
    public function getOptionsResolver(): OptionsResolver
106
    {
107
        return $this->optionsResolver;
108
    }
109
110
    public function setOptionsResolver(OptionsResolver $optionsResolver): self
111
    {
112
        $this->optionsResolver = $optionsResolver;
113
114
        return $this;
115
    }
116
}
117