Passed
Push — master ( 40871a...dd5f4c )
by Mr
07:11
created

RabbitMq3Migration::createMigrationList()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
cc 1
eloc 1
nc 1
nop 1
dl 0
loc 3
ccs 0
cts 3
cp 0
crap 2
rs 10
c 0
b 0
f 0
1
<?php declare(strict_types=1);
2
/**
3
 * This file is part of the daikon-cqrs/rabbitmq3-adapter project.
4
 *
5
 * For the full copyright and license information, please view the LICENSE
6
 * file that was distributed with this source code.
7
 */
8
9
namespace Daikon\RabbitMq3\Migration;
10
11
use Daikon\Dbal\Migration\Migration;
12
13
abstract class RabbitMq3Migration extends Migration
14
{
15
    protected function createMigrationList(string $exchange): void
16
    {
17
        $this->declareExchange($exchange, 'topic', false, true, false, true);
18
    }
19
20
    protected function createMessagePipeline(string $exchange, int $repubInterval = 30000): void
21
    {
22
        $waitExchange = $exchange.'.waiting';
23
        $waitQueue = $waitExchange;
24
        $unroutedExchange = $exchange.'.unrouted';
25
        $unroutedQueue = $unroutedExchange;
26
        $repubExchange = $exchange.'.repub';
27
        $repubQueue = $repubExchange;
28
29
        // Setup the default exchange and queue pipelines
30
        $this->declareExchange($unroutedExchange, 'fanout', false, true, false, true); //internal
31
        $this->declareExchange($repubExchange, 'fanout', false, true, false, true); //internal
32
        $this->declareExchange($waitExchange, 'fanout', false, true, false);
33
        $this->declareExchange($exchange, 'topic', false, true, false, false, false, [
34
            'alternate-exchange' => $unroutedExchange
35
        ]);
36
        $this->declareQueue($waitQueue, false, true, false, false, false, [
37
            'x-dead-letter-exchange' => $exchange
38
        ]);
39
        $this->bindQueue($waitQueue, $waitExchange);
40
        $this->declareQueue($unroutedQueue, false, true, false, false, false, [
41
            'x-dead-letter-exchange' => $repubExchange,
42
            'x-message-ttl' => $repubInterval
43
        ]);
44
        $this->bindQueue($unroutedQueue, $unroutedExchange);
45
        $this->declareQueue($repubQueue, false, true, false, false);
46
        $this->bindQueue($repubQueue, $repubExchange);
47
48
        $this->createShovel($repubExchange, $exchange, $repubQueue);
49
    }
50
51
    protected function deleteMessagePipeline(string $exchange): void
52
    {
53
        $waitExchange = $exchange.'.waiting';
54
        $waitQueue = $waitExchange;
55
        $unroutedExchange = $exchange.'.unrouted';
56
        $unroutedQueue = $unroutedExchange;
57
        $repubExchange = $exchange.'.repub';
58
        $repubQueue = $repubExchange;
59
60
        $this->deleteShovel($repubExchange);
61
        $this->deleteExchange($waitExchange);
62
        $this->deleteExchange($unroutedExchange);
63
        $this->deleteExchange($repubExchange);
64
        $this->deleteExchange($exchange);
65
        $this->deleteQueue($waitQueue);
66
        $this->deleteQueue($unroutedQueue);
67
        $this->deleteQueue($repubQueue);
68
    }
69
70
    protected function declareExchange(
71
        string $exchange,
72
        string $type,
73
        bool $passive = false,
74
        bool $durable = false,
75
        bool $autoDelete = true,
76
        bool $internal = false,
77
        bool $noWait = false,
78
        array $arguments = []
79
    ): void {
80
        $uri = sprintf('/api/exchanges/%s/%s', $this->getVhost(), $exchange);
81
        $this->connector->getConnection()->put($uri, [
82
            'body' => json_encode([
83
                'type' => $type,
84
                'passive' => $passive,
85
                'durable' => $durable,
86
                'auto_delete' => $autoDelete,
87
                'internal' => $internal,
88
                'nowait' => $noWait,
89
                'arguments' => $arguments
90
            ])
91
        ]);
92
    }
93
94
    protected function bindExchange(
95
        string $source,
96
        string $dest,
97
        string $routingKey = '',
98
        bool $noWait = false,
99
        array $arguments = []
100
    ): void {
101
        $uri = sprintf('/api/bindings/%s/e/%s/e/%s', $this->getVhost(), $source, $dest);
102
        $this->connector->getConnection()->post($uri, [
103
            'body' => json_encode([
104
                'routing_key' => $routingKey,
105
                'nowait' => $noWait,
106
                'arguments' => $arguments
107
            ])
108
        ]);
109
    }
110
111
    protected function deleteExchange(string $exchange): void
112
    {
113
        $uri = sprintf('/api/exchanges/%s/%s', $this->getVhost(), $exchange);
114
        $this->connector->getConnection()->delete($uri);
115
    }
116
117
    protected function declareQueue(
118
        string $queue,
119
        bool $passive = false,
120
        bool $durable = false,
121
        bool $exclusive = false,
122
        bool $autoDelete = true,
123
        bool $noWait = false,
124
        array $arguments = []
125
    ): void {
126
        $uri = sprintf('/api/queues/%s/%s', $this->getVhost(), $queue);
127
        $this->connector->getConnection()->put($uri, [
128
            'body' => json_encode([
129
                'passive' => $passive,
130
                'durable' => $durable,
131
                'exclusive' => $exclusive,
132
                'auto_delete' => $autoDelete,
133
                'nowait' => $noWait,
134
                'arguments' => $arguments
135
            ])
136
        ]);
137
    }
138
139
    protected function bindQueue(
140
        string $queue,
141
        string $exchange,
142
        string $routingKey = '',
143
        bool $noWait = false,
144
        array $arguments = []
145
    ): void {
146
        $uri = sprintf('/api/bindings/%s/e/%s/q/%s', $this->getVhost(), $exchange, $queue);
147
        $this->connector->getConnection()->post($uri, [
148
            'body' => json_encode([
149
                'routing_key' => $routingKey,
150
                'nowait' => $noWait,
151
                'arguments' => $arguments
152
            ])
153
        ]);
154
    }
155
156
    protected function deleteQueue(string $queue): void
157
    {
158
        $uri = sprintf('/api/queues/%s/%s', $this->getVhost(), $queue);
159
        $this->connector->getConnection()->delete($uri);
160
    }
161
162
    protected function createShovel(string $source, string $dest, string $queue): void
163
    {
164
        $uri = sprintf('/api/parameters/shovel/%s/%s.shovel', $this->getVhost(), $source);
165
        $this->connector->getConnection()->put($uri, [
166
            'body' => json_encode([
167
                'value' => [
168
                    'src-uri' => 'amqp://',
169
                    'src-queue' => $queue,
170
                    'dest-uri' => 'amqp://',
171
                    'dest-exchange' => $dest,
172
                    'add-forward-headers' => false,
173
                    'ack-mode' => 'on-confirm',
174
                    'delete-after' => 'never'
175
                ]
176
            ])
177
        ]);
178
    }
179
180
    protected function deleteShovel(string $exchange): void
181
    {
182
        $uri = sprintf('/api/parameters/shovel/%s/%s.shovel', $this->getVhost(), $exchange);
183
        $this->connector->getConnection()->delete($uri);
184
    }
185
186
    protected function getVhost(): string
187
    {
188
        $connectorSettings = $this->connector->getSettings();
189
        return $connectorSettings['vhost'];
190
    }
191
}
192