Completed
Pull Request — 0.4 (#35)
by jean
03:28
created

ConsumeMessageStep   A

Complexity

Total Complexity 17

Size/Duplication

Total Lines 105
Duplicated Lines 0 %

Test Coverage

Coverage 0%

Importance

Changes 0
Metric Value
eloc 36
dl 0
loc 105
ccs 0
cts 73
cp 0
rs 10
c 0
b 0
f 0
wmc 17

12 Methods

Rating   Name   Duplication   Size   Complexity  
A getProvider() 0 3 1
A valid() 0 9 2
A count() 0 3 1
A getCurrent() 0 12 3
A __construct() 0 3 1
A execute() 0 4 1
A configureOptionResolver() 0 7 1
A getProgress() 0 3 1
A getProviderName() 0 5 1
A onFailedLoop() 0 7 2
A next() 0 9 1
A onSuccessLoop() 0 7 2
1
<?php
2
3
namespace Darkilliant\MqProcessBundle\Step;
4
5
use Darkilliant\MqProcessBundle\Message\Message;
6
use Darkilliant\MqProcessBundle\Message\Provider\AmqpMessageProvider;
7
use Darkilliant\MqProcessBundle\Message\Provider\MessageProviderFactory;
8
use Darkilliant\ProcessBundle\State\ProcessState;
9
use Darkilliant\ProcessBundle\Step\AbstractConfigurableStep;
10
use Darkilliant\ProcessBundle\Step\IterableStepInterface;
11
use Symfony\Component\OptionsResolver\OptionsResolver;
12
13
class ConsumeMessageStep extends AbstractConfigurableStep implements IterableStepInterface
14
{
15
    /** @var Message */
16
    private $currentMessage;
17
18
    /** @var MessageProviderFactory */
19
    private $providerFactory;
20
21
    /**
22
     * @throws \AMQPConnectionException
23
     */
24
    public function __construct(MessageProviderFactory $providerFactory)
25
    {
26
        $this->providerFactory = $providerFactory;
27
    }
28
29
    public function configureOptionResolver(OptionsResolver $resolver): OptionsResolver
30
    {
31
        $resolver->setRequired(['queue', 'exchange', 'client', 'ack_required']);
32
        $resolver->setDefault('client', 'amqp_lib');
33
        $resolver->setDefault('ack_required', true);
34
35
        return parent::configureOptionResolver($resolver);
36
    }
37
38
    public function execute(ProcessState $state)
39
    {
40
        $state->info('start consume');
41
        $this->getProvider($state)->consume($state->getOptions()['ack_required']);
42
    }
43
44
    public function getProgress(ProcessState $state)
45
    {
46
        return 50;
47
    }
48
49
    public function count(ProcessState $state)
50
    {
51
        return 100;
52
    }
53
54
    public function next(ProcessState $state)
55
    {
56
        $raw = $this->getCurrent($state)->getBody();
57
58
        $data = json_decode($raw, true);
59
        $state->info('product', ['data' => $data]);
60
        $state->setData($data);
61
62
        $this->currentMessage = null;
63
    }
64
65
    public function valid(ProcessState $state)
66
    {
67
        $current = $this->getCurrent($state);
68
69
        if (null === $current) {
70
            return false;
71
        }
72
73
        return true;
74
    }
75
76
    public function onSuccessLoop(ProcessState $state)
77
    {
78
        if (!$state->getOptions()['ack_required']) {
79
            return;
80
        }
81
82
        $this->getProvider($state)->messageOk($this->getCurrent($state));
83
    }
84
85
    public function onFailedLoop(ProcessState $state)
86
    {
87
        if (!$state->getOptions()['ack_required']) {
88
            return;
89
        }
90
91
        $this->getProvider($state)->messageKo($this->getCurrent($state));
92
    }
93
94
    private function getProviderName(ProcessState $state)
95
    {
96
        $options = $state->getOptions();
97
98
        return $options['queue'];
99
    }
100
101
    private function getProvider(ProcessState $state): AmqpMessageProvider
102
    {
103
        return $this->providerFactory->factory($this->getProviderName($state), $state->getOptions());
104
    }
105
106
    private function getCurrent(ProcessState $state): Message
107
    {
108
        while (null === $this->currentMessage) {
109
            $message = $this->getProvider($state)->fetchMessage();
110
            if (null !== $message) {
111
                return $this->currentMessage = $message;
112
            }
113
114
            $this->getProvider($state)->waitChannel(false);
115
        }
116
117
        return $this->currentMessage;
118
    }
119
}
120