Passed
Pull Request — 0.4 (#35)
by jean
03:13
created

SendMessageStep::__construct()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 2
nc 1
nop 2
dl 0
loc 4
ccs 3
cts 3
cp 1
crap 1
rs 10
c 0
b 0
f 0
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Darkilliant\MqProcessBundle\Step;
6
7
use Darkilliant\MqProcessBundle\Message\Message;
8
use Darkilliant\MqProcessBundle\Message\Publisher\MessagePublisherFactory;
9
use Darkilliant\MqProcessBundle\Message\Publisher\MessagePublisherInterface;
10
use Darkilliant\ProcessBundle\ProcessNotifier\ProgressBarProcessNotifier;
11
use Darkilliant\ProcessBundle\State\ProcessState;
12
use Darkilliant\ProcessBundle\Step\AbstractConfigurableStep;
13
use Darkilliant\ProcessBundle\Step\MonitorableStepInterface;
14
use Symfony\Component\OptionsResolver\OptionsResolver;
15
16
class SendMessageStep extends AbstractConfigurableStep implements MonitorableStepInterface
17
{
18
    /** @var MessagePublisherFactory */
19
    private $publisherFactory;
20
21
    /** @var ProgressBarProcessNotifier */
22
    private $notifier;
23
24
    private $count;
25
26
    private $countMessageReceived = 0;
27
28 5
    public function __construct(MessagePublisherFactory $publisherFactory, ProgressBarProcessNotifier $notifier)
29
    {
30 5
        $this->publisherFactory = $publisherFactory;
31 5
        $this->notifier = $notifier;
32 5
    }
33
34 1
    public function configureOptionResolver(OptionsResolver $resolver): OptionsResolver
35
    {
36 1
        $resolver->setRequired(['queue', 'exchange', 'client', 'batch_count', 'persistant']);
37 1
        $resolver->setDefault('client', 'amqp_lib');
38 1
        $resolver->setDefault('persistant', false);
39 1
        $resolver->setDefault('batch_count', 1);
40
41 1
        return parent::configureOptionResolver($resolver); // TODO: Change the autogenerated stub
42
    }
43
44 1
    public function execute(ProcessState $state)
45
    {
46 1
        $this->getPublisher($state)->publish(
47 1
            new Message(json_encode($state->getData())),
48 1
            $state->getOptions()['batch_count']
49
        );
50 1
    }
51
52 3
    public function count(ProcessState $state)
53
    {
54 3
        if (null === $this->count) {
55 3
            $this->count = $this->getPublisher($state)->countMessages();
56
        }
57
58 3
        return $this->count;
59
    }
60
61 2
    public function getProgress(ProcessState $state)
62
    {
63 2
        return $this->countMessageReceived = $this->count - $this->getPublisher($state)->countMessages();
64
    }
65
66 1
    public function finalize(ProcessState $state)
67
    {
68 1
        $this->getPublisher($state)->finalize();
69
70 1
        $this->notifier->onStartIterableProcess($state, $this);
71 1
        while ($this->countMessageReceived < $this->count) {
72 1
            $this->notifier->onUpdateIterableProcess($state, $this);
73 1
            sleep(1);
74
        }
75 1
    }
76
77 4
    private function getPublisherName(ProcessState $state)
78
    {
79 4
        $options = $state->getOptions();
80
81 4
        return $options['exchange'].'_'.$options['queue'];
82
    }
83
84 4
    private function getPublisher(ProcessState $state): MessagePublisherInterface
85
    {
86 4
        return $this->publisherFactory->factory($this->getPublisherName($state), $state->getOptions());
87
    }
88
}
89