Completed
Push — master ( 53783e...ee648a )
by Olivier
22s queued 20s
created

Consumer::consume()   B

Complexity

Conditions 10
Paths 48

Size

Total Lines 45
Code Lines 24

Duplication

Lines 0
Ratio 0 %

Importance

Changes 4
Bugs 0 Features 0
Metric Value
cc 10
eloc 24
c 4
b 0
f 0
nc 48
nop 1
dl 0
loc 45
rs 7.6666

How to fix   Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
namespace Swarrot;
4
5
use Psr\Log\LoggerInterface;
6
use Psr\Log\NullLogger;
7
use Swarrot\Broker\MessageProvider\MessageProviderInterface;
8
use Swarrot\Processor\ConfigurableInterface;
9
use Swarrot\Processor\InitializableInterface;
10
use Swarrot\Processor\ProcessorInterface;
11
use Swarrot\Processor\SleepyInterface;
12
use Swarrot\Processor\TerminableInterface;
13
use Symfony\Component\OptionsResolver\OptionsResolver;
14
15
class Consumer
16
{
17
    private $messageProvider;
18
    private $processor;
19
    private $optionsResolver;
20
    private $logger;
21
22
    public function __construct(MessageProviderInterface $messageProvider, ProcessorInterface $processor, OptionsResolver $optionsResolver = null, LoggerInterface $logger = null)
23
    {
24
        $this->messageProvider = $messageProvider;
25
        $this->processor = $processor;
26
        $this->optionsResolver = $optionsResolver ?: new OptionsResolver();
27
        $this->logger = $logger ?: new NullLogger();
28
    }
29
30
    /**
31
     * @param array $options Parameters sent to the processor
32
     */
33
    public function consume(array $options = []): void
34
    {
35
        $queueName = $this->messageProvider->getQueueName();
36
37
        $this->logger->debug('Start consuming queue.', [
38
            'queue' => $queueName,
39
        ]);
40
41
        $this->optionsResolver->setDefaults([
42
            'poll_interval' => 50000,
43
            'queue' => $queueName,
44
        ]);
45
46
        if ($this->processor instanceof ConfigurableInterface) {
47
            $this->processor->setDefaultOptions($this->optionsResolver);
48
        }
49
50
        $options = $this->optionsResolver->resolve($options);
51
52
        if ($this->processor instanceof InitializableInterface) {
53
            $this->processor->initialize($options);
54
        }
55
56
        while (true) {
57
            while (null !== $message = $this->messageProvider->get()) {
58
                $result = $this->processor->process($message, $options);
59
                if (!\is_bool($result)) {
60
                    @trigger_error('Processors must return a bool since Swarrot 3.7', E_USER_DEPRECATED);
61
                }
62
                if (false === $result) {
63
                    break 2;
64
                }
65
            }
66
67
            if ($this->processor instanceof SleepyInterface) {
68
                if (false === $this->processor->sleep($options)) {
69
                    break;
70
                }
71
            }
72
73
            usleep($options['poll_interval']);
74
        }
75
76
        if ($this->processor instanceof TerminableInterface) {
77
            $this->processor->terminate($options);
78
        }
79
    }
80
81
    public function getMessageProvider(): MessageProviderInterface
82
    {
83
        return $this->messageProvider;
84
    }
85
86
    public function setMessageProvider(MessageProviderInterface $messageProvider): self
87
    {
88
        $this->messageProvider = $messageProvider;
89
90
        return $this;
91
    }
92
93
    public function getProcessor(): ProcessorInterface
94
    {
95
        return $this->processor;
96
    }
97
98
    public function setProcessor(ProcessorInterface $processor): self
99
    {
100
        $this->processor = $processor;
101
102
        return $this;
103
    }
104
105
    public function getOptionsResolver(): OptionsResolver
106
    {
107
        return $this->optionsResolver;
108
    }
109
110
    public function setOptionsResolver(OptionsResolver $optionsResolver): self
111
    {
112
        $this->optionsResolver = $optionsResolver;
113
114
        return $this;
115
    }
116
}
117