Passed
Pull Request — 0.4 (#35)
by jean
03:52
created

ConsumeMessageStep   A

Complexity

Total Complexity 16

Size/Duplication

Total Lines 102
Duplicated Lines 0 %

Test Coverage

Coverage 0%

Importance

Changes 0
Metric Value
eloc 36
dl 0
loc 102
ccs 0
cts 72
cp 0
rs 10
c 0
b 0
f 0
wmc 16

12 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 3 1
A getProvider() 0 3 1
A valid() 0 3 1
A count() 0 3 1
A getCurrent() 0 12 3
A execute() 0 5 1
A configureOptionResolver() 0 9 1
A getProgress() 0 3 1
A getProviderName() 0 5 1
A onFailedLoop() 0 7 2
A next() 0 9 1
A onSuccessLoop() 0 7 2
1
<?php
2
3
namespace Darkilliant\MqProcessBundle\Step;
4
5
use Darkilliant\MqProcessBundle\Message\Message;
6
use Darkilliant\MqProcessBundle\Message\Provider\AmqpMessageProvider;
7
use Darkilliant\MqProcessBundle\Message\Provider\MessageProviderFactory;
8
use Darkilliant\ProcessBundle\State\ProcessState;
9
use Darkilliant\ProcessBundle\Step\AbstractConfigurableStep;
10
use Darkilliant\ProcessBundle\Step\IterableStepInterface;
11
use Symfony\Component\OptionsResolver\OptionsResolver;
12
13
class ConsumeMessageStep extends AbstractConfigurableStep implements IterableStepInterface
14
{
15
    /** @var Message */
16
    private $currentMessage;
17
18
    /** @var MessageProviderFactory */
19
    private $providerFactory;
20
21
    /**
22
     * @throws \AMQPConnectionException
23
     */
24
    public function __construct(MessageProviderFactory $providerFactory)
25
    {
26
        $this->providerFactory = $providerFactory;
27
    }
28
29
    public function configureOptionResolver(OptionsResolver $resolver): OptionsResolver
30
    {
31
        $resolver->setRequired(['queue', 'exchange', 'client', 'ack_required', 'persistant', 'batch_count']);
32
        $resolver->setDefault('client', 'amqp_lib');
33
        $resolver->setDefault('ack_required', true);
34
        $resolver->setDefault('persistant', true);
35
        $resolver->setDefault('batch_count', 1);
36
37
        return parent::configureOptionResolver($resolver);
38
    }
39
40
    public function execute(ProcessState $state)
41
    {
42
        $state->info('start consume');
43
        $state->setContext('queue', $state->getOptions()['queue']);
44
        $this->getProvider($state)->consume($state->getOptions()['ack_required']);
45
    }
46
47
    public function getProgress(ProcessState $state)
48
    {
49
        return 50;
50
    }
51
52
    public function count(ProcessState $state)
53
    {
54
        return 100;
55
    }
56
57
    public function next(ProcessState $state)
58
    {
59
        $this->currentMessage = null;
60
61
        $raw = $this->getCurrent($state)->getBody();
62
63
        $data = json_decode($raw, true);
64
        $state->info('consume message', ['data' => $data]);
65
        $state->setData($data);
66
    }
67
68
    public function valid(ProcessState $state)
69
    {
70
        return true;
71
    }
72
73
    public function onSuccessLoop(ProcessState $state)
74
    {
75
        if (!$state->getOptions()['ack_required']) {
76
            return;
77
        }
78
79
        $this->getProvider($state)->messageOk($this->getCurrent($state));
80
    }
81
82
    public function onFailedLoop(ProcessState $state)
83
    {
84
        if (!$state->getOptions()['ack_required']) {
85
            return;
86
        }
87
88
        $this->getProvider($state)->messageKo($this->getCurrent($state));
89
    }
90
91
    private function getProviderName(ProcessState $state)
92
    {
93
        $options = $state->getOptions();
94
95
        return $options['queue'];
96
    }
97
98
    private function getProvider(ProcessState $state): AmqpMessageProvider
99
    {
100
        return $this->providerFactory->factory($this->getProviderName($state), $state->getOptions());
101
    }
102
103
    private function getCurrent(ProcessState $state): Message
104
    {
105
        while (null === $this->currentMessage) {
106
            $message = $this->getProvider($state)->fetchMessage();
107
            if (null !== $message) {
108
                return $this->currentMessage = $message;
109
            }
110
111
            $this->getProvider($state)->waitChannel(false);
112
        }
113
114
        return $this->currentMessage;
115
    }
116
}
117