Completed
Pull Request — 0.4 (#35)
by jean
15:04
created

SendMessageStep::execute()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 8
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 4
nc 1
nop 1
dl 0
loc 8
rs 10
c 0
b 0
f 0
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Darkilliant\MqProcessBundle\Step;
6
7
use Darkilliant\MqProcessBundle\Message\Message;
8
use Darkilliant\MqProcessBundle\Message\Publisher\MessagePublisherFactory;
9
use Darkilliant\ProcessBundle\ProcessNotifier\ProgressBarProcessNotifier;
10
use Darkilliant\ProcessBundle\State\ProcessState;
11
use Darkilliant\ProcessBundle\Step\AbstractConfigurableStep;
12
use Darkilliant\ProcessBundle\Step\MonitorableStepInterface;
13
use Symfony\Component\OptionsResolver\OptionsResolver;
14
15
class SendMessageStep extends AbstractConfigurableStep implements MonitorableStepInterface
16
{
17
    /** @var MessagePublisherFactory */
18
    private $publisherFactory;
19
20
    /** @var ProgressBarProcessNotifier */
21
    private $notifier;
22
23
    private $count;
24
25
    private $countMessageReceived = 0;
26
27
    public function __construct(MessagePublisherFactory $publisherFactory, ProgressBarProcessNotifier $notifier)
28
    {
29
        $this->publisherFactory = $publisherFactory;
30
        $this->notifier = $notifier;
31
    }
32
33
    public function configureOptionResolver(OptionsResolver $resolver): OptionsResolver
34
    {
35
        $resolver->setRequired(['queue', 'exchange', 'client', 'wait_consume_on_finalize', 'batch_count', 'persistant']);
36
        $resolver->setDefault('client', 'amqp_lib');
37
        $resolver->setDefault('wait_consume_on_finalize', false);
38
        $resolver->setDefault('persistant', false);
39
        $resolver->setDefault('batch_count', 1);
40
41
        return parent::configureOptionResolver($resolver); // TODO: Change the autogenerated stub
42
    }
43
44
    public function execute(ProcessState $state)
45
    {
46
        $this->getPublisher($state)->publish(
47
            new Message(json_encode($state->getData())),
48
            $state->getOptions()['batch_count']
49
        );
50
51
        $state->setContext('wait_consume', true);
52
    }
53
54
    public function count(ProcessState $state)
55
    {
56
        if (null === $this->count) {
57
            $this->count = $this->getPublisher($state)->countMessages();
58
        }
59
60
        return $this->count;
61
    }
62
63
    public function getProgress(ProcessState $state)
64
    {
65
        return $this->countMessageReceived = $this->count - $this->getPublisher($state)->countMessages();
66
    }
67
68
    public function finalize(ProcessState $state)
69
    {
70
        // TODO: Attention, toute les infos ne sotn pas forcément disponible
71
        $this->getPublisher($state)->finalize();
72
73
        if (!$state->getContext('wait_consume')) {
74
            return;
75
        }
76
77
        $this->notifier->onStartIterableProcess($state, $this);
78
        while ($this->countMessageReceived < $this->count) {
79
            $this->notifier->onUpdateIterableProcess($state, $this);
80
            sleep(1);
81
        }
82
    }
83
84
    private function getPublisherName(ProcessState $state)
85
    {
86
        $options = $state->getOptions();
87
88
        return $options['exchange'].'_'.$options['queue'];
89
    }
90
91
    private function getPublisher(ProcessState $state)
92
    {
93
        return $this->publisherFactory->factory($this->getPublisherName($state), $state->getOptions());
94
    }
95
}
96