Passed
Pull Request — 0.4 (#35)
by jean
03:52
created

ConsumeMessageStep::getProviderName()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 5
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
cc 1
eloc 2
nc 1
nop 1
dl 0
loc 5
ccs 0
cts 4
cp 0
crap 2
rs 10
c 0
b 0
f 0
1
<?php
2
3
namespace Darkilliant\MqProcessBundle\Step;
4
5
use Darkilliant\MqProcessBundle\Message\Message;
6
use Darkilliant\MqProcessBundle\Message\Provider\AmqpMessageProvider;
7
use Darkilliant\MqProcessBundle\Message\Provider\MessageProviderFactory;
8
use Darkilliant\ProcessBundle\State\ProcessState;
9
use Darkilliant\ProcessBundle\Step\AbstractConfigurableStep;
10
use Darkilliant\ProcessBundle\Step\IterableStepInterface;
11
use Symfony\Component\OptionsResolver\OptionsResolver;
12
13
class ConsumeMessageStep extends AbstractConfigurableStep implements IterableStepInterface
14
{
15
    /** @var Message */
16
    private $currentMessage;
17
18
    /** @var MessageProviderFactory */
19
    private $providerFactory;
20
21
    /**
22
     * @throws \AMQPConnectionException
23
     */
24
    public function __construct(MessageProviderFactory $providerFactory)
25
    {
26
        $this->providerFactory = $providerFactory;
27
    }
28
29
    public function configureOptionResolver(OptionsResolver $resolver): OptionsResolver
30
    {
31
        $resolver->setRequired(['queue', 'exchange', 'client', 'ack_required', 'persistant', 'batch_count']);
32
        $resolver->setDefault('client', 'amqp_lib');
33
        $resolver->setDefault('ack_required', true);
34
        $resolver->setDefault('persistant', true);
35
        $resolver->setDefault('batch_count', 1);
36
37
        return parent::configureOptionResolver($resolver);
38
    }
39
40
    public function execute(ProcessState $state)
41
    {
42
        $state->info('start consume');
43
        $state->setContext('queue', $state->getOptions()['queue']);
44
        $this->getProvider($state)->consume($state->getOptions()['ack_required']);
45
    }
46
47
    public function getProgress(ProcessState $state)
48
    {
49
        return 50;
50
    }
51
52
    public function count(ProcessState $state)
53
    {
54
        return 100;
55
    }
56
57
    public function next(ProcessState $state)
58
    {
59
        $this->currentMessage = null;
60
61
        $raw = $this->getCurrent($state)->getBody();
62
63
        $data = json_decode($raw, true);
64
        $state->info('consume message', ['data' => $data]);
65
        $state->setData($data);
66
    }
67
68
    public function valid(ProcessState $state)
69
    {
70
        return true;
71
    }
72
73
    public function onSuccessLoop(ProcessState $state)
74
    {
75
        if (!$state->getOptions()['ack_required']) {
76
            return;
77
        }
78
79
        $this->getProvider($state)->messageOk($this->getCurrent($state));
80
    }
81
82
    public function onFailedLoop(ProcessState $state)
83
    {
84
        if (!$state->getOptions()['ack_required']) {
85
            return;
86
        }
87
88
        $this->getProvider($state)->messageKo($this->getCurrent($state));
89
    }
90
91
    private function getProviderName(ProcessState $state)
92
    {
93
        $options = $state->getOptions();
94
95
        return $options['queue'];
96
    }
97
98
    private function getProvider(ProcessState $state): AmqpMessageProvider
99
    {
100
        return $this->providerFactory->factory($this->getProviderName($state), $state->getOptions());
101
    }
102
103
    private function getCurrent(ProcessState $state): Message
104
    {
105
        while (null === $this->currentMessage) {
106
            $message = $this->getProvider($state)->fetchMessage();
107
            if (null !== $message) {
108
                return $this->currentMessage = $message;
109
            }
110
111
            $this->getProvider($state)->waitChannel(false);
112
        }
113
114
        return $this->currentMessage;
115
    }
116
}
117