Passed
Push — master ( 40871a...dd5f4c )
by Mr
07:11
created

RabbitMq3Migration::declareExchange()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 20
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
cc 1
eloc 10
nc 1
nop 8
dl 0
loc 20
ccs 0
cts 20
cp 0
crap 2
rs 9.9332
c 0
b 0
f 0

How to fix   Many Parameters   

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
<?php declare(strict_types=1);
2
/**
3
 * This file is part of the daikon-cqrs/rabbitmq3-adapter project.
4
 *
5
 * For the full copyright and license information, please view the LICENSE
6
 * file that was distributed with this source code.
7
 */
8
9
namespace Daikon\RabbitMq3\Migration;
10
11
use Daikon\Dbal\Migration\Migration;
12
13
abstract class RabbitMq3Migration extends Migration
14
{
15
    protected function createMigrationList(string $exchange): void
16
    {
17
        $this->declareExchange($exchange, 'topic', false, true, false, true);
18
    }
19
20
    protected function createMessagePipeline(string $exchange, int $repubInterval = 30000): void
21
    {
22
        $waitExchange = $exchange.'.waiting';
23
        $waitQueue = $waitExchange;
24
        $unroutedExchange = $exchange.'.unrouted';
25
        $unroutedQueue = $unroutedExchange;
26
        $repubExchange = $exchange.'.repub';
27
        $repubQueue = $repubExchange;
28
29
        // Setup the default exchange and queue pipelines
30
        $this->declareExchange($unroutedExchange, 'fanout', false, true, false, true); //internal
31
        $this->declareExchange($repubExchange, 'fanout', false, true, false, true); //internal
32
        $this->declareExchange($waitExchange, 'fanout', false, true, false);
33
        $this->declareExchange($exchange, 'topic', false, true, false, false, false, [
34
            'alternate-exchange' => $unroutedExchange
35
        ]);
36
        $this->declareQueue($waitQueue, false, true, false, false, false, [
37
            'x-dead-letter-exchange' => $exchange
38
        ]);
39
        $this->bindQueue($waitQueue, $waitExchange);
40
        $this->declareQueue($unroutedQueue, false, true, false, false, false, [
41
            'x-dead-letter-exchange' => $repubExchange,
42
            'x-message-ttl' => $repubInterval
43
        ]);
44
        $this->bindQueue($unroutedQueue, $unroutedExchange);
45
        $this->declareQueue($repubQueue, false, true, false, false);
46
        $this->bindQueue($repubQueue, $repubExchange);
47
48
        $this->createShovel($repubExchange, $exchange, $repubQueue);
49
    }
50
51
    protected function deleteMessagePipeline(string $exchange): void
52
    {
53
        $waitExchange = $exchange.'.waiting';
54
        $waitQueue = $waitExchange;
55
        $unroutedExchange = $exchange.'.unrouted';
56
        $unroutedQueue = $unroutedExchange;
57
        $repubExchange = $exchange.'.repub';
58
        $repubQueue = $repubExchange;
59
60
        $this->deleteShovel($repubExchange);
61
        $this->deleteExchange($waitExchange);
62
        $this->deleteExchange($unroutedExchange);
63
        $this->deleteExchange($repubExchange);
64
        $this->deleteExchange($exchange);
65
        $this->deleteQueue($waitQueue);
66
        $this->deleteQueue($unroutedQueue);
67
        $this->deleteQueue($repubQueue);
68
    }
69
70
    protected function declareExchange(
71
        string $exchange,
72
        string $type,
73
        bool $passive = false,
74
        bool $durable = false,
75
        bool $autoDelete = true,
76
        bool $internal = false,
77
        bool $noWait = false,
78
        array $arguments = []
79
    ): void {
80
        $uri = sprintf('/api/exchanges/%s/%s', $this->getVhost(), $exchange);
81
        $this->connector->getConnection()->put($uri, [
82
            'body' => json_encode([
83
                'type' => $type,
84
                'passive' => $passive,
85
                'durable' => $durable,
86
                'auto_delete' => $autoDelete,
87
                'internal' => $internal,
88
                'nowait' => $noWait,
89
                'arguments' => $arguments
90
            ])
91
        ]);
92
    }
93
94
    protected function bindExchange(
95
        string $source,
96
        string $dest,
97
        string $routingKey = '',
98
        bool $noWait = false,
99
        array $arguments = []
100
    ): void {
101
        $uri = sprintf('/api/bindings/%s/e/%s/e/%s', $this->getVhost(), $source, $dest);
102
        $this->connector->getConnection()->post($uri, [
103
            'body' => json_encode([
104
                'routing_key' => $routingKey,
105
                'nowait' => $noWait,
106
                'arguments' => $arguments
107
            ])
108
        ]);
109
    }
110
111
    protected function deleteExchange(string $exchange): void
112
    {
113
        $uri = sprintf('/api/exchanges/%s/%s', $this->getVhost(), $exchange);
114
        $this->connector->getConnection()->delete($uri);
115
    }
116
117
    protected function declareQueue(
118
        string $queue,
119
        bool $passive = false,
120
        bool $durable = false,
121
        bool $exclusive = false,
122
        bool $autoDelete = true,
123
        bool $noWait = false,
124
        array $arguments = []
125
    ): void {
126
        $uri = sprintf('/api/queues/%s/%s', $this->getVhost(), $queue);
127
        $this->connector->getConnection()->put($uri, [
128
            'body' => json_encode([
129
                'passive' => $passive,
130
                'durable' => $durable,
131
                'exclusive' => $exclusive,
132
                'auto_delete' => $autoDelete,
133
                'nowait' => $noWait,
134
                'arguments' => $arguments
135
            ])
136
        ]);
137
    }
138
139
    protected function bindQueue(
140
        string $queue,
141
        string $exchange,
142
        string $routingKey = '',
143
        bool $noWait = false,
144
        array $arguments = []
145
    ): void {
146
        $uri = sprintf('/api/bindings/%s/e/%s/q/%s', $this->getVhost(), $exchange, $queue);
147
        $this->connector->getConnection()->post($uri, [
148
            'body' => json_encode([
149
                'routing_key' => $routingKey,
150
                'nowait' => $noWait,
151
                'arguments' => $arguments
152
            ])
153
        ]);
154
    }
155
156
    protected function deleteQueue(string $queue): void
157
    {
158
        $uri = sprintf('/api/queues/%s/%s', $this->getVhost(), $queue);
159
        $this->connector->getConnection()->delete($uri);
160
    }
161
162
    protected function createShovel(string $source, string $dest, string $queue): void
163
    {
164
        $uri = sprintf('/api/parameters/shovel/%s/%s.shovel', $this->getVhost(), $source);
165
        $this->connector->getConnection()->put($uri, [
166
            'body' => json_encode([
167
                'value' => [
168
                    'src-uri' => 'amqp://',
169
                    'src-queue' => $queue,
170
                    'dest-uri' => 'amqp://',
171
                    'dest-exchange' => $dest,
172
                    'add-forward-headers' => false,
173
                    'ack-mode' => 'on-confirm',
174
                    'delete-after' => 'never'
175
                ]
176
            ])
177
        ]);
178
    }
179
180
    protected function deleteShovel(string $exchange): void
181
    {
182
        $uri = sprintf('/api/parameters/shovel/%s/%s.shovel', $this->getVhost(), $exchange);
183
        $this->connector->getConnection()->delete($uri);
184
    }
185
186
    protected function getVhost(): string
187
    {
188
        $connectorSettings = $this->connector->getSettings();
189
        return $connectorSettings['vhost'];
190
    }
191
}
192