Completed
Pull Request — master (#39)
by Aleksandr
06:46
created

Declarator::declareQueues()   A

Complexity

Conditions 3
Paths 3

Size

Total Lines 23
Code Lines 16

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 3
eloc 16
c 1
b 0
f 0
nc 3
nop 1
dl 0
loc 23
rs 9.7333
1
<?php
2
3
namespace OldSound\RabbitMqBundle\Declarations;
4
5
use OldSound\RabbitMqBundle\RabbitMq\Consumer;
6
use PhpAmqpLib\Channel\AMQPChannel;
7
use Psr\Log\LoggerAwareTrait;
8
use Psr\Log\NullLogger;
9
use Symfony\Component\Console\Output\OutputInterface;
10
11
class Declarator
12
{
13
    use LoggerAwareTrait;
14
    /** @var AMQPChannel */
15
    private $channel;
16
    
17
    public function __construct(AMQPChannel $channel)
18
    {
19
        $this->channel = $channel;
20
        $this->logger = new NullLogger();
21
    }
22
23
    /**
24
     * @param ExchangeDeclaration[] $exchanges
25
     */
26
    public function declareExchanges(array $exchanges) 
27
    {
28
        foreach ($exchanges as $exchange) {
29
            $this->channel->exchange_declare(
30
                $exchange->name,
31
                $exchange->type,
32
                $exchange->passive,
33
                $exchange->durable,
34
                $exchange->autoDelete,
35
                $exchange->internal,
36
                $exchange->nowait,
37
                $exchange->arguments,
38
                $exchange->ticket,
39
            );
40
41
            $this->logger->info(sprintf('Exchange is declared successfully', ['exchange' => $exchange]));
0 ignored issues
show
Bug introduced by
array('exchange' => $exchange) of type array<string,OldSound\Ra...ns\ExchangeDeclaration> is incompatible with the type string expected by parameter $args of sprintf(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

41
            $this->logger->info(sprintf('Exchange is declared successfully', /** @scrutinizer ignore-type */ ['exchange' => $exchange]));
Loading history...
42
        }
43
    }
44
45
    /**
46
     * @param QueueDeclaration[] $queues
47
     * @return string[]
48
     */
49
    public function declareQueues(array $queues): array
50
    {
51
        $results = [];
52
        foreach ($queues as $queue) {
53
            $result = $this->channel->queue_declare(
54
                $queue->name,
55
                $queue->passive,
56
                $queue->durable,
57
                $queue->exclusive,
58
                $queue->autoDelete,
59
                $queue->nowait,
60
                $queue->arguments,
61
                $queue->ticket,
62
            );
63
64
            if ($result === null) {
65
                // TODO
66
            } else {
67
                $results[] = $result[0];
68
                $this->logger->info(sprintf('Queue is declared successfully', ['queue' => $queue]));
0 ignored issues
show
Bug introduced by
array('queue' => $queue) of type array<string,OldSound\Ra...tions\QueueDeclaration> is incompatible with the type string expected by parameter $args of sprintf(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

68
                $this->logger->info(sprintf('Queue is declared successfully', /** @scrutinizer ignore-type */ ['queue' => $queue]));
Loading history...
69
            }
70
        }
71
        return $results;
72
    }
73
74
    /**
75
     * @param BindingDeclaration[] $bindings
76
     */
77
    public function declareBindings(array $bindings) 
78
    {
79
        foreach ($bindings as $binding) {
80
            if ($binding->destinationIsExchange) {
81
                foreach ($binding->routingKeys as $routingKey) {
82
                    $this->channel->exchange_bind(
83
                        $binding->destination,
84
                        $binding->exchange,
0 ignored issues
show
Bug introduced by
$binding->exchange of type OldSound\RabbitMqBundle\...ons\ExchangeDeclaration is incompatible with the type string expected by parameter $source of PhpAmqpLib\Channel\AMQPChannel::exchange_bind(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

84
                        /** @scrutinizer ignore-type */ $binding->exchange,
Loading history...
85
                        $routingKey,
86
                        $binding->nowait,
87
                        $binding->arguments
88
                    );
89
                }
90
            } else {
91
                foreach ($binding->routingKeys as $routingKey) {
92
                    $this->channel->queue_bind(
93
                        $binding->destination,
94
                        $binding->exchange,
0 ignored issues
show
Bug introduced by
$binding->exchange of type OldSound\RabbitMqBundle\...ons\ExchangeDeclaration is incompatible with the type string expected by parameter $exchange of PhpAmqpLib\Channel\AMQPChannel::queue_bind(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

94
                        /** @scrutinizer ignore-type */ $binding->exchange,
Loading history...
95
                        $routingKey,
96
                        $binding->nowait,
97
                        $binding->arguments
98
                    );
99
                }
100
            }
101
102
            $this->logger->info(sprintf('Binding is declared successfully', ['binding' => $binding]));
0 ignored issues
show
Bug introduced by
array('binding' => $binding) of type array<string,OldSound\Ra...ons\BindingDeclaration> is incompatible with the type string expected by parameter $args of sprintf(). ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

102
            $this->logger->info(sprintf('Binding is declared successfully', /** @scrutinizer ignore-type */ ['binding' => $binding]));
Loading history...
103
        }
104
    }
105
106
    public function declareForExchange(ExchangeDeclaration $exchange, DeclarationsRegistry $declarationsRegistry) 
107
    {
108
        $bindings = $declarationsRegistry->getBindingsByExchange($exchange);
109
        $queues = array_filter($bindings, function ($binding) use($exchange) {
110
            false === $binding->destinationIsExchange && $binding->destination == $exchange->name;
111
        });
112
113
        $this->declareExchanges([$exchange]);
114
        $this->declareQueues($queues);
115
        $this->declareBindings($bindings);
116
    }
117
118
    public function declareForQueueDeclaration(string $queueName, DeclarationsRegistry $declarationsRegistry)
119
    {
120
        $consumerQueues = array_filter($declarationsRegistry->queues, function ($queue) use ($queueName) {
121
            return $queue->name === $queueName;
122
            // TODO not found! exception?
123
        });
124
125
        /** @var BindingDeclaration[] $bindings */
126
        $bindings = [];
127
        $exchanges = [];
128
        foreach ($consumerQueues as $queue) {
129
            $b = array_filter($declarationsRegistry->bindings, function ($binding) use ($queue) {
130
                return !$binding->destinationIsExchange && $binding->destination === $queue->name;
131
            });
132
            $bindings = array_merge($bindings, $b);
133
            foreach ($b as $binding) {
134
                $exchanges[] = $binding->exchange;
135
                if ($binding->destinationIsExchange) {
136
                    $exchanges[] = $binding->destination;
137
                }
138
            }
139
        }
140
141
        $exchanges = array_map(fn ($exchange) => $declarationsRegistry->exchanges[$exchange], array_unique($exchanges));
142
        $this->declareExchanges($exchanges);
143
        $this->declareQueues($consumerQueues);
144
        $this->declareBindings($bindings);
145
    }
146
    
147
    public function declareForQueue(QueueDeclaration $queue)
148
    {
149
        $exchanges = array_map(function ($binding) {
150
            return $binding->exchange;
151
        }, $queue->bindings);
0 ignored issues
show
Bug introduced by
The property bindings does not seem to exist on OldSound\RabbitMqBundle\...ations\QueueDeclaration.
Loading history...
152
153
        $this->declareExchanges($exchanges);
154
        $this->declareQueues([$queue]);
155
        $this->declareBindings($queue->bindings);
156
    }
157
    
158
    public function purgeQueue(QueueDeclaration $queue, $nowait = true, ?int $ticket = null)
159
    {
160
        $this->channel->queue_purge($queue->name, $nowait, $ticket);
161
    }
162
    
163
    public function deleteQueue(
164
        QueueDeclaration $queue, 
165
        bool $ifUnsed = true, 
166
        bool $ifEmpry = false,
167
        bool $nowait = false, 
168
        ?int $ticket = null
169
    ) {
170
        $this->channel->queue_delete($queue->name, $ifUnsed, $ifEmpry, $nowait, $ticket);
171
    }
172
}