RabbitMq3MigrationAdapter   A
last analyzed

Complexity

Total Complexity 11

Size/Duplication

Total Lines 80
Duplicated Lines 0 %

Test Coverage

Coverage 0%

Importance

Changes 2
Bugs 0 Features 0
Metric Value
eloc 33
c 2
b 0
f 0
dl 0
loc 80
ccs 0
cts 32
cp 0
rs 10
wmc 11

7 Methods

Rating   Name   Duplication   Size   Complexity  
A getConnector() 0 3 1
A createMigrationList() 0 9 2
A write() 0 20 4
A read() 0 9 1
A getVhost() 0 4 1
A loadMigrations() 0 5 1
A __construct() 0 4 1
1
<?php declare(strict_types=1);
2
/**
3
 * This file is part of the daikon-cqrs/rabbitmq3-adapter project.
4
 *
5
 * For the full copyright and license information, please view the LICENSE
6
 * file that was distributed with this source code.
7
 */
8
9
namespace Daikon\RabbitMq3\Migration;
10
11
use Daikon\Dbal\Connector\ConnectorInterface;
12
use Daikon\Dbal\Migration\MigrationAdapterInterface;
13
use Daikon\Dbal\Migration\MigrationList;
14
use Daikon\RabbitMq3\Connector\RabbitMq3AdminConnector;
15
use DateTimeImmutable;
16
17
final class RabbitMq3MigrationAdapter implements MigrationAdapterInterface
18
{
19
    private RabbitMq3AdminConnector $connector;
20
21
    private array $settings;
22
23
    public function __construct(RabbitMq3AdminConnector $connector, array $settings = [])
24
    {
25
        $this->connector = $connector;
26
        $this->settings = $settings;
27
    }
28
29
    public function read(string $identifier): MigrationList
30
    {
31
        $currentMigrations= $this->loadMigrations();
32
        $migrations = array_filter(
33
            $currentMigrations,
34
            fn(array $migration): bool => $migration['routing_key'] === $identifier
35
        );
36
37
        return $this->createMigrationList($migrations);
38
    }
39
40
    /*
41
     * We do not have a way of storing the migration list as a data structure in RabbitMQ so instead
42
     * we make use of internal exchange bindings with metadata as a way of tracking the migration state
43
     * of the messaging infrastructure.
44
     */
45
    public function write(string $identifier, MigrationList $migrationList): void
46
    {
47
        if ($migrationList->isEmpty()) {
48
            return;
49
        }
50
51
        $exchange = $this->settings['exchange'];
52
        $client = $this->connector->getConnection();
53
        $uri = sprintf('/api/bindings/%1$s/e/%2$s/e/%2$s', $this->getVhost(), $exchange);
54
55
        // delete existing migration list entries before rewriting
56
        foreach ($this->loadMigrations() as $migration) {
57
            $client->delete($uri.'/'.$migration['properties_key']);
58
        }
59
60
        foreach ($migrationList as $migration) {
61
            $client->post($uri, [
62
                'body' => json_encode([
63
                    'routing_key' => $identifier,
64
                    'arguments' => $migration->toNative()
65
                ])
66
            ]);
67
        }
68
    }
69
70
    public function getConnector(): ConnectorInterface
71
    {
72
        return $this->connector;
73
    }
74
75
    private function loadMigrations(): array
76
    {
77
        $uri = sprintf('/api/exchanges/%s/%s/bindings/source', $this->getVhost(), $this->settings['exchange']);
78
        $response = $this->connector->getConnection()->get($uri);
79
        return json_decode((string)$response->getBody(), true);
80
    }
81
82
    private function createMigrationList(array $migrationData): MigrationList
83
    {
84
        $migrations = [];
85
        foreach ($migrationData as $migration) {
86
            $migrationClass = $migration['arguments']['@type'];
87
            $migrations[] = new $migrationClass(new DateTimeImmutable($migration['arguments']['executedAt']));
88
        }
89
90
        return (new MigrationList($migrations))->sortByVersion();
91
    }
92
93
    private function getVhost(): string
94
    {
95
        $connectorSettings = $this->connector->getSettings();
96
        return $connectorSettings['vhost'];
97
    }
98
}
99