Completed
Push — develop ( 53c9e6...feab1b )
by Baptiste
07:33
created

AutoDeclare::declareThrough()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 16
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 16
rs 9.4285
c 0
b 0
f 0
cc 2
eloc 9
nc 2
nop 1
1
<?php
2
declare(strict_types = 1);
3
4
namespace Innmind\AMQPBundle\Client;
5
6
use Innmind\AMQP\{
7
    Client,
8
    Client\Channel,
9
    Model\Exchange\Declaration as Exchange,
10
    Model\Exchange\Type,
11
    Model\Queue\Declaration as Queue,
12
    Model\Queue\Binding
13
};
14
use Innmind\Immutable\Set;
15
16
final class AutoDeclare implements Client
17
{
18
    private $client;
19
    private $exchanges;
20
    private $queues;
21
    private $bindings;
22
    private $declared = false;
23
24
    public function __construct(Client $client)
25
    {
26
        $this->client = $client;
27
        $this->exchanges = new Set(Exchange::class);
28
        $this->queues = new Set(Queue::class);
29
        $this->bindings = new Set(Binding::class);
30
    }
31
32
    public function declareExchange(string $name, string $type, bool $durable, array $arguments): void
33
    {
34
        $constructor = $durable ? 'durable' : 'temporary';
35
        $exchange = Exchange::$constructor($name, Type::$type());
36
37
        foreach ($arguments as $key => $value) {
38
            $exchange = $exchange->withArgument($key, $value);
39
        }
40
41
        $this->exchanges = $this->exchanges->add($exchange);
42
    }
43
44
    public function declareQueue(string $name, bool $durable, bool $exclusive, array $arguments): void
45
    {
46
        $constructor = $durable ? 'durable' : 'temporary';
47
        $queue = Queue::$constructor()->withName($name);
48
49
        if ($exclusive) {
50
            $queue = $queue->exclusive();
51
        }
52
53
        foreach ($arguments as $key => $value) {
54
            $queue = $queue->withArgument($key, $value);
55
        }
56
57
        $this->queues = $this->queues->add($queue);
58
    }
59
60
    public function declareBinding(string $exchange, string $queue, string $routingKey, array $arguments): void
61
    {
62
        $binding = new Binding($exchange, $queue, $routingKey);
63
64
        foreach ($arguments as $key => $value) {
65
            $binding = $binding->withArgument($key, $value);
66
        }
67
68
        $this->bindings = $this->bindings->add($binding);
69
    }
70
71
    public function channel(): Channel
72
    {
73
        $channel = $this->client->channel();
74
        $this->declareThrough($channel);
75
76
        return $channel;
77
    }
78
79
    public function closed(): bool
80
    {
81
        return $this->client->closed();
82
    }
83
84
    public function close(): void
85
    {
86
        $this->client->close();
87
        $this->declared = true;
88
    }
89
90
    private function declareThrough(Channel $channel): void
91
    {
92
        if ($this->declared) {
93
            return;
94
        }
95
96
        $this->exchanges->foreach(static function(Exchange $command) use ($channel): void {
97
            $channel->exchange()->declare($command);
98
        });
99
        $this->queues->foreach(static function(Queue $command) use ($channel): void {
100
            $channel->queue()->declare($command);
101
        });
102
        $this->bindings->foreach(static function(Binding $command) use ($channel): void {
103
            $channel->queue()->bind($command);
104
        });
105
    }
106
}
107