RabbitMq3Migration   A
last analyzed

Complexity

Total Complexity 10

Size/Duplication

Total Lines 127
Duplicated Lines 0 %

Test Coverage

Coverage 0%

Importance

Changes 1
Bugs 0 Features 0
Metric Value
eloc 52
c 1
b 0
f 0
dl 0
loc 127
ccs 0
cts 29
cp 0
rs 10
wmc 10

10 Methods

Rating   Name   Duplication   Size   Complexity  
A createMigrationList() 0 3 1
A deleteExchange() 0 4 1
A createShovel() 0 13 1
A declareQueue() 0 18 1
A declareExchange() 0 20 1
A bindQueue() 0 13 1
A getVhost() 0 4 1
A bindExchange() 0 13 1
A deleteQueue() 0 4 1
A deleteShovel() 0 4 1
1
<?php declare(strict_types=1);
2
/**
3
 * This file is part of the daikon-cqrs/rabbitmq3-adapter project.
4
 *
5
 * For the full copyright and license information, please view the LICENSE
6
 * file that was distributed with this source code.
7
 */
8
9
namespace Daikon\RabbitMq3\Migration;
10
11
use Daikon\Dbal\Migration\Migration;
12
use PhpAmqpLib\Exchange\AMQPExchangeType;
13
14
abstract class RabbitMq3Migration extends Migration
15
{
16
    protected function createMigrationList(string $exchange): void
17
    {
18
        $this->declareExchange($exchange, AMQPExchangeType::TOPIC, false, true, false, true);
19
    }
20
21
    protected function declareExchange(
22
        string $exchange,
23
        string $type,
24
        bool $passive = false,
25
        bool $durable = false,
26
        bool $autoDelete = true,
27
        bool $internal = false,
28
        bool $noWait = false,
29
        array $arguments = []
30
    ): void {
31
        $uri = sprintf('/api/exchanges/%s/%s', $this->getVhost(), $exchange);
32
        $this->connector->getConnection()->put($uri, [
33
            'body' => json_encode([
34
                'type' => $type,
35
                'passive' => $passive,
36
                'durable' => $durable,
37
                'auto_delete' => $autoDelete,
38
                'internal' => $internal,
39
                'nowait' => $noWait,
40
                'arguments' => $arguments
41
            ])
42
        ]);
43
    }
44
45
    protected function bindExchange(
46
        string $source,
47
        string $dest,
48
        string $routingKey = '',
49
        bool $noWait = false,
50
        array $arguments = []
51
    ): void {
52
        $uri = sprintf('/api/bindings/%s/e/%s/e/%s', $this->getVhost(), $source, $dest);
53
        $this->connector->getConnection()->post($uri, [
54
            'body' => json_encode([
55
                'routing_key' => $routingKey,
56
                'nowait' => $noWait,
57
                'arguments' => $arguments
58
            ])
59
        ]);
60
    }
61
62
    protected function deleteExchange(string $exchange): void
63
    {
64
        $uri = sprintf('/api/exchanges/%s/%s', $this->getVhost(), $exchange);
65
        $this->connector->getConnection()->delete($uri);
66
    }
67
68
    protected function declareQueue(
69
        string $queue,
70
        bool $passive = false,
71
        bool $durable = false,
72
        bool $exclusive = false,
73
        bool $autoDelete = true,
74
        bool $noWait = false,
75
        array $arguments = []
76
    ): void {
77
        $uri = sprintf('/api/queues/%s/%s', $this->getVhost(), $queue);
78
        $this->connector->getConnection()->put($uri, [
79
            'body' => json_encode([
80
                'passive' => $passive,
81
                'durable' => $durable,
82
                'exclusive' => $exclusive,
83
                'auto_delete' => $autoDelete,
84
                'nowait' => $noWait,
85
                'arguments' => $arguments
86
            ])
87
        ]);
88
    }
89
90
    protected function bindQueue(
91
        string $queue,
92
        string $exchange,
93
        string $routingKey = '',
94
        bool $noWait = false,
95
        array $arguments = []
96
    ): void {
97
        $uri = sprintf('/api/bindings/%s/e/%s/q/%s', $this->getVhost(), $exchange, $queue);
98
        $this->connector->getConnection()->post($uri, [
99
            'body' => json_encode([
100
                'routing_key' => $routingKey,
101
                'nowait' => $noWait,
102
                'arguments' => $arguments
103
            ])
104
        ]);
105
    }
106
107
    protected function deleteQueue(string $queue): void
108
    {
109
        $uri = sprintf('/api/queues/%s/%s', $this->getVhost(), $queue);
110
        $this->connector->getConnection()->delete($uri);
111
    }
112
113
    protected function createShovel(string $source, string $dest, string $queue): void
114
    {
115
        $uri = sprintf('/api/parameters/shovel/%s/%s.shovel', $this->getVhost(), $source);
116
        $this->connector->getConnection()->put($uri, [
117
            'body' => json_encode([
118
                'value' => [
119
                    'src-uri' => 'amqp://',
120
                    'src-queue' => $queue,
121
                    'dest-uri' => 'amqp://',
122
                    'dest-exchange' => $dest,
123
                    'add-forward-headers' => false,
124
                    'ack-mode' => 'on-confirm',
125
                    'delete-after' => 'never'
126
                ]
127
            ])
128
        ]);
129
    }
130
131
    protected function deleteShovel(string $exchange): void
132
    {
133
        $uri = sprintf('/api/parameters/shovel/%s/%s.shovel', $this->getVhost(), $exchange);
134
        $this->connector->getConnection()->delete($uri);
135
    }
136
137
    protected function getVhost(): string
138
    {
139
        $connectorSettings = $this->connector->getSettings();
140
        return $connectorSettings['vhost'];
141
    }
142
}
143