Completed
Pull Request — 0.4 (#35)
by jean
11:34
created

SendMessageStep::configureOptionResolver()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 9
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 6
nc 1
nop 1
dl 0
loc 9
rs 10
c 0
b 0
f 0
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Darkilliant\MqProcessBundle\Step;
6
7
use Darkilliant\MqProcessBundle\Message\Publisher\MessagePublisherFactory;
8
use Darkilliant\ProcessBundle\ProcessNotifier\ProgressBarProcessNotifier;
9
use Darkilliant\ProcessBundle\State\ProcessState;
10
use Darkilliant\ProcessBundle\Step\AbstractConfigurableStep;
11
use Darkilliant\ProcessBundle\Step\MonitorableStepInterface;
12
use Symfony\Component\OptionsResolver\OptionsResolver;
13
14
class SendMessageStep extends AbstractConfigurableStep implements MonitorableStepInterface
15
{
16
    /** @var MessagePublisherFactory */
17
    private $publisherFactory;
18
19
    /** @var ProgressBarProcessNotifier */
20
    private $notifier;
21
22
    private $count;
23
24
    private $countMessageReceived = 0;
25
26
    public function __construct(MessagePublisherFactory $publisherFactory, ProgressBarProcessNotifier $notifier)
27
    {
28
        $this->publisherFactory = $publisherFactory;
29
        $this->notifier = $notifier;
30
    }
31
32
    public function configureOptionResolver(OptionsResolver $resolver): OptionsResolver
33
    {
34
        $resolver->setRequired(['queue', 'exchange', 'client', 'wait_consume_on_finalize', 'batch_count', 'persistant']);
35
        $resolver->setDefault('client', 'amqp_lib');
36
        $resolver->setDefault('wait_consume_on_finalize', false);
37
        $resolver->setDefault('persistant', false);
38
        $resolver->setDefault('batch_count', 1);
39
40
        return parent::configureOptionResolver($resolver); // TODO: Change the autogenerated stub
41
    }
42
43
    public function execute(ProcessState $state)
44
    {
45
        $this->getPublisher($state)->publish(
46
            json_encode($state->getData()),
47
            $state->getOptions()['batch_count']
48
        );
49
    }
50
51
    public function count(ProcessState $state)
52
    {
53
        if (null === $this->count) {
54
            $this->count = $this->getPublisher($state)->countMessages();
55
        }
56
57
        return $this->count;
58
    }
59
60
    public function getProgress(ProcessState $state)
61
    {
62
        return $this->countMessageReceived = $this->count - $this->getPublisher($state)->countMessages();
63
    }
64
65
    public function finalize(ProcessState $state)
66
    {
67
        $this->getPublisher($state)->finalize();
68
69
        if (!$state->getOptions()['wait_consume_on_finalize']) {
70
            return;
71
        }
72
73
        $this->notifier->onStartIterableProcess($state, $this);
74
        while ($this->countMessageReceived < $this->count) {
75
            $this->notifier->onUpdateIterableProcess($state, $this);
76
            sleep(1);
77
        }
78
    }
79
80
    private function getPublisherName(ProcessState $state)
81
    {
82
        $options = $state->getOptions();
83
84
        return $options['exchange'].'_'.$options['queue'];
85
    }
86
87
    private function getPublisher(ProcessState $state)
88
    {
89
        return $this->publisherFactory->factory($this->getPublisherName($state), $state->getOptions());
90
    }
91
}
92