Test Failed
Pull Request — master (#39)
by Aleksandr
05:36
created

Declarator::declareBindings()   B

Complexity

Conditions 7
Paths 9

Size

Total Lines 44
Code Lines 32

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 7
eloc 32
c 1
b 0
f 0
nc 9
nop 1
dl 0
loc 44
rs 8.4746
1
<?php
2
3
namespace OldSound\RabbitMqBundle\Declarations;
4
5
use OldSound\RabbitMqBundle\RabbitMq\Consumer;
6
use PhpAmqpLib\Channel\AMQPChannel;
7
use Psr\Log\LoggerAwareTrait;
8
use Psr\Log\NullLogger;
9
use Symfony\Component\Console\Output\OutputInterface;
10
11
class Declarator
12
{
13
    use LoggerAwareTrait;
14
    /** @var AMQPChannel */
15
    private $channel;
16
    
17
    public function __construct(AMQPChannel $channel)
18
    {
19
        $this->channel = $channel;
20
        $this->logger = new NullLogger();
21
    }
22
23
    /**
24
     * @param ExchangeDeclaration[] $exchanges
25
     */
26
    public function declareExchanges(array $exchanges) 
27
    {
28
        foreach ($exchanges as $exchange) {
29
            $this->channel->exchange_declare(
30
                $exchange->name,
31
                $exchange->type,
32
                $exchange->passive,
33
                $exchange->durable,
34
                $exchange->autoDelete,
35
                $exchange->internal,
36
                $exchange->nowait,
37
                $exchange->arguments,
38
                $exchange->ticket,
39
            );
40
41
            $this->logger->info(sprintf('Exchange is declared successfully', ['exchange' => $exchange]));
0 ignored issues
show
Bug introduced by
array('exchange' => $exchange) of type array<string,OldSound\Ra...ns\ExchangeDeclaration> is incompatible with the type string expected by parameter $args of sprintf(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

41
            $this->logger->info(sprintf('Exchange is declared successfully', /** @scrutinizer ignore-type */ ['exchange' => $exchange]));
Loading history...
42
        }
43
    }
44
45
    /**
46
     * @param QueueDeclaration[] $queues
47
     * @return string[]
48
     */
49
    public function declareQueues(array $queues): array
50
    {
51
        $results = [];
52
        foreach ($queues as $queue) {
53
            $result = $this->channel->queue_declare(
54
                $queue->name,
55
                $queue->passive,
56
                $queue->durable,
57
                $queue->exclusive,
58
                $queue->autoDelete,
59
                $queue->nowait,
60
                $queue->arguments,
61
                $queue->ticket,
62
            );
63
64
            if ($result === null) {
65
                // TODO
66
            } else {
67
                $results[] = $result[0];
68
                $this->logger->info(sprintf('Queue is declared successfully', ['queue' => $queue]));
0 ignored issues
show
Bug introduced by
array('queue' => $queue) of type array<string,OldSound\Ra...tions\QueueDeclaration> is incompatible with the type string expected by parameter $args of sprintf(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

68
                $this->logger->info(sprintf('Queue is declared successfully', /** @scrutinizer ignore-type */ ['queue' => $queue]));
Loading history...
69
            }
70
        }
71
        return $results;
72
    }
73
74
    /**
75
     * @param BindingDeclaration[] $bindings
76
     */
77
    public function declareBindings(array $bindings) 
78
    {
79
        foreach ($bindings as $binding) {
80
            if ($binding->destinationIsExchange) {
81
                foreach ($binding->routingKeys as $routingKey) {
82
                    $this->channel->exchange_bind(
83
                        $binding->destination,
84
                        $binding->exchange,
0 ignored issues
show
Bug introduced by
$binding->exchange of type OldSound\RabbitMqBundle\...ons\ExchangeDeclaration is incompatible with the type string expected by parameter $source of PhpAmqpLib\Channel\AMQPChannel::exchange_bind(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

84
                        /** @scrutinizer ignore-type */ $binding->exchange,
Loading history...
85
                        $routingKey,
86
                        $binding->nowait,
87
                        $binding->arguments
88
                    );
89
                }
90
                if ([] === $binding->routingKeys) {
91
                    $this->channel->queue_bind(
92
                        $binding->destination,
93
                        $binding->exchange,
0 ignored issues
show
Bug introduced by
$binding->exchange of type OldSound\RabbitMqBundle\...ons\ExchangeDeclaration is incompatible with the type string expected by parameter $exchange of PhpAmqpLib\Channel\AMQPChannel::queue_bind(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

93
                        /** @scrutinizer ignore-type */ $binding->exchange,
Loading history...
94
                        '',
95
                        $binding->nowait,
96
                        $binding->arguments
97
                    );
98
                }
99
            } else {
100
                foreach ($binding->routingKeys as $routingKey) {
101
                    $this->channel->queue_bind(
102
                        $binding->destination,
103
                        $binding->exchange,
104
                        $routingKey,
105
                        $binding->nowait,
106
                        $binding->arguments
107
                    );
108
                }
109
                if ([] === $binding->routingKeys) {
110
                    $this->channel->queue_bind(
111
                        $binding->destination,
112
                        $binding->exchange,
113
                        '',
114
                        $binding->nowait,
115
                        $binding->arguments
116
                    );
117
                }
118
            }
119
120
            $this->logger->info(sprintf('Binding is declared successfully', ['binding' => $binding]));
0 ignored issues
show
Bug introduced by
array('binding' => $binding) of type array<string,OldSound\Ra...ons\BindingDeclaration> is incompatible with the type string expected by parameter $args of sprintf(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

120
            $this->logger->info(sprintf('Binding is declared successfully', /** @scrutinizer ignore-type */ ['binding' => $binding]));
Loading history...
121
        }
122
    }
123
124
    public function declareForExchange(ExchangeDeclaration $exchange, DeclarationsRegistry $declarationsRegistry) 
125
    {
126
        $bindings = $declarationsRegistry->getBindingsByExchange($exchange);
127
        $queues = array_filter($bindings, function ($binding) use($exchange) {
128
            false === $binding->destinationIsExchange && $binding->destination == $exchange->name;
129
        });
130
131
        $this->declareExchanges([$exchange]);
132
        $this->declareQueues($queues);
133
        $this->declareBindings($bindings);
134
    }
135
136
    public function declareForQueueDeclaration(string $queueName, DeclarationsRegistry $declarationsRegistry)
137
    {
138
        $consumerQueues = array_filter($declarationsRegistry->queues, function ($queue) use ($queueName) {
139
            return $queue->name === $queueName;
140
            // TODO not found! exception?
141
        });
142
143
        /** @var BindingDeclaration[] $bindings */
144
        $bindings = [];
145
        $exchanges = [];
146
        foreach ($consumerQueues as $queue) {
147
            $b = array_filter($declarationsRegistry->bindings, function ($binding) use ($queue) {
148
                return !$binding->destinationIsExchange && $binding->destination === $queue->name;
149
            });
150
            $bindings = array_merge($bindings, $b);
151
            foreach ($b as $binding) {
152
                $exchanges[] = $binding->exchange;
153
                if ($binding->destinationIsExchange) {
154
                    $exchanges[] = $binding->destination;
155
                }
156
            }
157
        }
158
159
        $exchanges = array_map(fn ($exchange) => $declarationsRegistry->exchanges[$exchange], array_unique($exchanges));
160
        $this->declareExchanges($exchanges);
161
        $this->declareQueues($consumerQueues);
162
        $this->declareBindings($bindings);
163
    }
164
    
165
    public function declareForQueue(QueueDeclaration $queue)
166
    {
167
        $exchanges = array_map(function ($binding) {
168
            return $binding->exchange;
169
        }, $queue->bindings);
0 ignored issues
show
Bug introduced by
The property bindings does not seem to exist on OldSound\RabbitMqBundle\...ations\QueueDeclaration.
Loading history...
170
171
        $this->declareExchanges($exchanges);
172
        $this->declareQueues([$queue]);
173
        $this->declareBindings($queue->bindings);
174
    }
175
    
176
    public function purgeQueue(QueueDeclaration $queue, $nowait = true, ?int $ticket = null)
177
    {
178
        $this->channel->queue_purge($queue->name, $nowait, $ticket);
179
    }
180
    
181
    public function deleteQueue(
182
        QueueDeclaration $queue, 
183
        bool $ifUnsed = true, 
184
        bool $ifEmpry = false,
185
        bool $nowait = false, 
186
        ?int $ticket = null
187
    ) {
188
        $this->channel->queue_delete($queue->name, $ifUnsed, $ifEmpry, $nowait, $ticket);
189
    }
190
}