Completed
Pull Request — 0.4 (#35)
by jean
10:46
created

ConsumeMessageStep::getProvider()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 1
nc 1
nop 1
dl 0
loc 3
rs 10
c 0
b 0
f 0
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Darkilliant\MqProcessBundle\Step;
6
7
use Darkilliant\MqProcessBundle\Message\Message;
8
use Darkilliant\MqProcessBundle\Message\Provider\AmqpMessageProvider;
9
use Darkilliant\MqProcessBundle\Message\Provider\MessageProviderFactory;
10
use Darkilliant\ProcessBundle\State\ProcessState;
11
use Darkilliant\ProcessBundle\Step\AbstractConfigurableStep;
12
use Darkilliant\ProcessBundle\Step\IterableStepInterface;
13
use Symfony\Component\OptionsResolver\OptionsResolver;
14
15
class ConsumeMessageStep extends AbstractConfigurableStep implements IterableStepInterface
16
{
17
    /** @var Message */
18
    private $currentMessage;
19
20
    /** @var MessageProviderFactory */
21
    private $providerFactory;
22
23
    /**
24
     * @throws \AMQPConnectionException
25
     */
26
    public function __construct(MessageProviderFactory $providerFactory)
27
    {
28
        $this->providerFactory = $providerFactory;
29
    }
30
31
    public function configureOptionResolver(OptionsResolver $resolver): OptionsResolver
32
    {
33
        $resolver->setRequired(['queue', 'exchange', 'client', 'ack_required', 'persistant', 'batch_count', 'requeue_on_fail']);
34
        $resolver->setDefault('client', 'amqp_lib');
35
        $resolver->setDefault('ack_required', true);
36
        $resolver->setDefault('persistant', true);
37
        $resolver->setDefault('batch_count', 1);
38
        $resolver->setDefault('requeue_on_fail', false);
39
40
        return parent::configureOptionResolver($resolver);
41
    }
42
43
    public function execute(ProcessState $state)
44
    {
45
        $state->info('start consume');
46
        $state->setContext('queue', $state->getOptions()['queue']);
47
        $this->getProvider($state)->consume($state->getOptions()['ack_required']);
48
    }
49
50
    public function getProgress(ProcessState $state)
51
    {
52
        return 50;
53
    }
54
55
    public function count(ProcessState $state)
56
    {
57
        return 100;
58
    }
59
60
    public function next(ProcessState $state)
61
    {
62
        $this->currentMessage = null;
63
64
        $raw = $this->getCurrent($state)->getBody();
65
66
        $data = json_decode($raw, true);
67
        $state->info('consume message', ['data' => $data]);
68
        $state->setData($data);
69
    }
70
71
    public function valid(ProcessState $state)
72
    {
73
        return true;
74
    }
75
76
    public function onSuccessLoop(ProcessState $state)
77
    {
78
        if (!$state->getOptions()['ack_required']) {
79
            return;
80
        }
81
82
        $this->getProvider($state)->messageOk($this->getCurrent($state));
83
    }
84
85
    public function onFailedLoop(ProcessState $state)
86
    {
87
        if (!$state->getOptions()['ack_required']) {
88
            return;
89
        }
90
91
        $this->getProvider($state)->messageKo($this->getCurrent($state), $state->getOptions()['requeue_on_fail']);
92
    }
93
94
    private function getProviderName(ProcessState $state)
95
    {
96
        $options = $state->getOptions();
97
98
        return $options['queue'];
99
    }
100
101
    private function getProvider(ProcessState $state): AmqpMessageProvider
102
    {
103
        return $this->providerFactory->factory($this->getProviderName($state), $state->getOptions());
104
    }
105
106
    private function getCurrent(ProcessState $state): Message
107
    {
108
        while (null === $this->currentMessage) {
109
            $message = $this->getProvider($state)->fetchMessage();
110
            if (null !== $message) {
111
                return $this->currentMessage = $message;
112
            }
113
114
            $this->getProvider($state)->waitChannel(false);
115
        }
116
117
        return $this->currentMessage;
118
    }
119
}
120