Completed
Pull Request — 0.4 (#35)
by jean
03:28
created

SendMessageStep   A

Complexity

Total Complexity 11

Size/Duplication

Total Lines 76
Duplicated Lines 0 %

Test Coverage

Coverage 0%

Importance

Changes 0
Metric Value
eloc 30
dl 0
loc 76
ccs 0
cts 52
cp 0
rs 10
c 0
b 0
f 0
wmc 11

8 Methods

Rating   Name   Duplication   Size   Complexity  
A getPublisherName() 0 5 1
A configureOptionResolver() 0 9 1
A finalize() 0 12 3
A execute() 0 5 1
A __construct() 0 4 1
A count() 0 7 2
A getProgress() 0 3 1
A getPublisher() 0 3 1
1
<?php
2
3
namespace Darkilliant\MqProcessBundle\Step;
4
5
use Darkilliant\MqProcessBundle\Message\Publisher\MessagePublisherFactory;
6
use Darkilliant\ProcessBundle\ProcessNotifier\ProgressBarProcessNotifier;
7
use Darkilliant\ProcessBundle\State\ProcessState;
8
use Darkilliant\ProcessBundle\Step\AbstractConfigurableStep;
9
use Darkilliant\ProcessBundle\Step\MonitorableStepInterface;
10
use Symfony\Component\OptionsResolver\OptionsResolver;
11
12
class SendMessageStep extends AbstractConfigurableStep implements MonitorableStepInterface
13
{
14
    /** @var MessagePublisherFactory */
15
    private $publisherFactory;
16
17
    /** @var ProgressBarProcessNotifier */
18
    private $notifier;
19
20
    private $count;
21
22
    private $countMessageReceived = 0;
23
24
    public function __construct(MessagePublisherFactory $publisherFactory, ProgressBarProcessNotifier $notifier)
25
    {
26
        $this->publisherFactory = $publisherFactory;
27
        $this->notifier = $notifier;
28
    }
29
30
    public function configureOptionResolver(OptionsResolver $resolver): OptionsResolver
31
    {
32
        $resolver->setRequired(['queue', 'exchange', 'client', 'wait_consume_on_finalize', 'batch_count', 'persistant']);
33
        $resolver->setDefault('client', 'amqp_lib');
34
        $resolver->setDefault('wait_consume_on_finalize', false);
35
        $resolver->setDefault('persistant', false);
36
        $resolver->setDefault('batch_count', 1);
37
38
        return parent::configureOptionResolver($resolver); // TODO: Change the autogenerated stub
39
    }
40
41
    public function execute(ProcessState $state)
42
    {
43
        $this->getPublisher($state)->publish(
44
            json_encode($state->getData()),
45
            $state->getOptions()['batch_count']
46
        );
47
    }
48
49
    public function count(ProcessState $state)
50
    {
51
        if (null === $this->count) {
52
            $this->count = $this->getPublisher($state)->countMessages();
53
        }
54
55
        return $this->count;
56
    }
57
58
    public function getProgress(ProcessState $state)
59
    {
60
        return $this->countMessageReceived = $this->count - $this->getPublisher($state)->countMessages();
61
    }
62
63
    public function finalize(ProcessState $state)
64
    {
65
        $this->getPublisher($state)->finalize();
66
67
        if (!$state->getOptions()['wait_consume_on_finalize']) {
68
            return;
69
        }
70
71
        $this->notifier->onStartIterableProcess($state, $this);
72
        while ($this->countMessageReceived < $this->count) {
73
            $this->notifier->onUpdateIterableProcess($state, $this);
74
            sleep(1);
75
        }
76
    }
77
78
    private function getPublisherName(ProcessState $state)
79
    {
80
        $options = $state->getOptions();
81
82
        return $options['exchange'].'_'.$options['queue'];
83
    }
84
85
    private function getPublisher(ProcessState $state)
86
    {
87
        return $this->publisherFactory->factory($this->getPublisherName($state), $state->getOptions());
88
    }
89
}
90