Passed
Pull Request — 0.4 (#35)
by jean
03:56
created

ConsumeMessageStep::getProvider()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 1
nc 1
nop 1
dl 0
loc 3
ccs 2
cts 2
cp 1
crap 1
rs 10
c 0
b 0
f 0
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Darkilliant\MqProcessBundle\Step;
6
7
use Darkilliant\MqProcessBundle\Message\Message;
8
use Darkilliant\MqProcessBundle\Message\Provider\MessageProviderFactory;
9
use Darkilliant\MqProcessBundle\Message\Provider\MessageProviderInterface;
10
use Darkilliant\ProcessBundle\State\ProcessState;
11
use Darkilliant\ProcessBundle\Step\AbstractConfigurableStep;
12
use Darkilliant\ProcessBundle\Step\IterableStepInterface;
13
use Symfony\Component\OptionsResolver\OptionsResolver;
14
15
class ConsumeMessageStep extends AbstractConfigurableStep implements IterableStepInterface
16
{
17
    /** @var Message */
18
    private $currentMessage;
19
20
    /** @var MessageProviderFactory */
21
    private $providerFactory;
22
23
    /**
24
     * @throws \AMQPConnectionException
25
     */
26 11
    public function __construct(MessageProviderFactory $providerFactory)
27
    {
28 11
        $this->providerFactory = $providerFactory;
29 11
    }
30
31 1
    public function configureOptionResolver(OptionsResolver $resolver): OptionsResolver
32
    {
33 1
        $resolver->setRequired(['queue', 'exchange', 'client', 'ack_required', 'persistant', 'batch_count', 'requeue_on_fail']);
34 1
        $resolver->setDefault('client', 'amqp_lib');
35 1
        $resolver->setDefault('ack_required', true);
36 1
        $resolver->setDefault('persistant', true);
37 1
        $resolver->setDefault('batch_count', 1);
38 1
        $resolver->setDefault('requeue_on_fail', false);
39
40 1
        return parent::configureOptionResolver($resolver);
41
    }
42
43 1
    public function execute(ProcessState $state)
44
    {
45 1
        $state->info('start consume');
46 1
        $state->setContext('queue', $state->getOptions()['queue']);
47 1
        $this->getProvider($state)->consume($state->getOptions()['ack_required']);
48 1
    }
49
50 1
    public function getProgress(ProcessState $state)
51
    {
52 1
        return 50;
53
    }
54
55 1
    public function count(ProcessState $state)
56
    {
57 1
        return 100;
58
    }
59
60 3
    public function next(ProcessState $state)
61
    {
62 3
        $this->currentMessage = null;
63
64 3
        $raw = $this->getCurrent($state)->getBody();
65
66 3
        $data = json_decode($raw, true);
67 3
        $state->info('consume message', ['data' => $data]);
68 3
        $state->setData($data);
69 3
    }
70
71 1
    public function valid(ProcessState $state)
72
    {
73 1
        return true;
74
    }
75
76 2
    public function onSuccessLoop(ProcessState $state)
77
    {
78 2
        if (!$state->getOptions()['ack_required']) {
79 1
            return;
80
        }
81
82 1
        $this->getProvider($state)->messageOk($this->getCurrent($state));
83 1
    }
84
85 2
    public function onFailedLoop(ProcessState $state)
86
    {
87 2
        if (!$state->getOptions()['ack_required']) {
88 1
            return;
89
        }
90
91 1
        $this->getProvider($state)->messageKo($this->getCurrent($state), $state->getOptions()['requeue_on_fail']);
92 1
    }
93
94 5
    private function getProviderName(ProcessState $state)
95
    {
96 5
        $options = $state->getOptions();
97
98 5
        return $options['queue'];
99
    }
100
101 5
    private function getProvider(ProcessState $state): MessageProviderInterface
102
    {
103 5
        return $this->providerFactory->factory($this->getProviderName($state), $state->getOptions());
104
    }
105
106 4
    private function getCurrent(ProcessState $state): Message
107
    {
108 4
        while (null === $this->currentMessage) {
109 4
            $message = $this->getProvider($state)->fetchMessage();
110 4
            if (null !== $message) {
111 4
                return $this->currentMessage = $message;
112
            }
113
114 1
            $this->getProvider($state)->waitChannel(false);
115
        }
116
117 1
        return $this->currentMessage;
118
    }
119
}
120