ContainerBuilder   A
last analyzed

Complexity

Total Complexity 16

Size/Duplication

Total Lines 144
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 72
dl 0
loc 144
rs 10
c 0
b 0
f 0
wmc 16

4 Methods

Rating   Name   Duplication   Size   Complexity  
A createConnections() 0 10 2
A createExchanges() 0 26 4
B createContainer() 0 52 6
A createQueues() 0 26 4
1
<?php
2
/**
3
 * Author: Joker
4
 * Date: 2020-05-08 13:57
5
 */
6
7
namespace JokerProject\LaravelAliyunAmqp\Builder;
8
9
use Illuminate\Support\Collection;
10
use JokerProject\LaravelAliyunAmqp\AMQPConnection;
11
use JokerProject\LaravelAliyunAmqp\Container;
12
use JokerProject\LaravelAliyunAmqp\Entity\ExchangeEntity;
13
use JokerProject\LaravelAliyunAmqp\Entity\QueueEntity;
14
15
/**
16
 * Class ContainerBuilder
17
 *
18
 * @package JokerProject\LaravelAliyunAmqp\Builder
19
 * @todo    Add config validator
20
 */
21
class ContainerBuilder
22
{
23
    /**
24
     * Create RabbitMQ Container
25
     *
26
     * @param array $config
27
     * @return Container
28
     */
29
    public function createContainer(array $config)
30
    {
31
        $connections = $this->createConnections($config['connections']);
32
        $exchanges = $this->createExchanges($config['exchanges'], $connections);
33
        $queues = $this->createQueues($config['queues'], $connections);
34
35
        $container = new Container();
36
        // create publishers
37
        foreach ($config['publishers'] as $publisherAliasName => $publisherEntityBind) {
38
            if ($exchanges->has($publisherEntityBind)) {
39
                $entity = $exchanges->get($publisherEntityBind);
40
            } elseif ($queues->has($publisherEntityBind)) {
41
                $entity = $queues->get($publisherEntityBind);
42
            } else {
43
                throw new \RuntimeException(
44
                    sprintf(
45
                        "Cannot create publisher %s: no exchange or queue named %s defined!",
46
                        (string)$publisherAliasName,
47
                        (string)$publisherEntityBind
48
                    )
49
                );
50
            }
51
52
            $container->addPublisher(
53
                $publisherAliasName,
54
                $entity
55
            );
56
        }
57
58
        foreach ($config['consumers'] as $consumerAliasName => $consumerDetails) {
59
            $prefetchCount    = $consumerDetails['prefetch_count'];
60
            $messageProcessor = $consumerDetails['message_processor'];
61
62
            if ($queues->has($consumerDetails['queue'])) {
63
                /** @var QueueEntity $entity */
64
                $entity = $queues->get($consumerDetails['queue']);
65
            } else {
66
                throw new \RuntimeException(
67
                    sprintf(
68
                        "Cannot create consumer %s: no queue named %s defined!",
69
                        (string)$consumerAliasName,
70
                        (string)$consumerDetails['queue']
71
                    )
72
                );
73
            }
74
75
            $entity->setPrefetchCount($prefetchCount);
76
            $entity->setMessageProcessor($messageProcessor);
77
            $container->addConsumer($consumerAliasName, $entity);
78
        }
79
80
        return $container;
81
    }
82
83
    /**
84
     * Create connections
85
     *
86
     * @param array $connectionConfig
87
     * @return Collection
88
     */
89
    private function createConnections(array $connectionConfig): Collection
90
    {
91
        $connections = new Collection();
92
        foreach ($connectionConfig as $connectionAliasName => $connectionCredentials) {
93
            $connections->put(
94
                $connectionAliasName,
95
                AMQPConnection::createConnection($connectionAliasName, $connectionCredentials)
96
            );
97
        }
98
        return $connections;
99
    }
100
101
    /**
102
     * @param array $exchangeConfigList
103
     * @param Collection $connections
104
     * @return Collection
105
     */
106
    private function createExchanges(array $exchangeConfigList, Collection $connections): Collection
107
    {
108
        $exchanges = new Collection();
109
        foreach ($exchangeConfigList as $exchangeAliasName => $exchangeDetails) {
110
            // verify if the connection exists
111
            if (array_key_exists('connection', $exchangeDetails) &&
112
                false === $connections->has($exchangeDetails['connection'])) {
113
                throw new \RuntimeException(
114
                    sprintf(
115
                        "Could not create exchange %s: connection name %s is not defined!",
116
                        (string)$exchangeAliasName,
117
                        (string)$exchangeDetails['connection']
118
                    )
119
                );
120
            }
121
122
            $exchanges->put(
123
                $exchangeAliasName,
124
                ExchangeEntity::createExchange(
125
                    $connections->get($exchangeDetails['connection']),
126
                    $exchangeAliasName,
127
                    array_merge($exchangeDetails['attributes'], ['name' => $exchangeDetails['name']])
128
                )
129
            );
130
        }
131
        return $exchanges;
132
    }
133
134
    /**
135
     * @param array $queueConfigList
136
     * @param Collection $connections
137
     * @return Collection
138
     */
139
    private function createQueues(array $queueConfigList, Collection $connections): Collection
140
    {
141
        $queue = new Collection();
142
        foreach ($queueConfigList as $queueAliasName => $queueDetails) {
143
            // verify if the connection exists
144
            if (array_key_exists('connection', $queueDetails) &&
145
                false === $connections->has($queueDetails['connection'])) {
146
                throw new \RuntimeException(
147
                    sprintf(
148
                        "Could not create exchange %s: connection name %s is not defined!",
149
                        (string)$queueAliasName,
150
                        (string)$queueDetails['connection']
151
                    )
152
                );
153
            }
154
155
            $queue->put(
156
                $queueAliasName,
157
                QueueEntity::createQueue(
158
                    $connections->get($queueDetails['connection']),
159
                    $queueAliasName,
160
                    array_merge($queueDetails['attributes'], ['name' => $queueDetails['name']])
161
                )
162
            );
163
        }
164
        return $queue;
165
    }
166
}
167