ContainerBuilder   A
last analyzed

Complexity

Total Complexity 17

Size/Duplication

Total Lines 148
Duplicated Lines 0 %

Test Coverage

Coverage 100%

Importance

Changes 1
Bugs 0 Features 0
Metric Value
eloc 76
c 1
b 0
f 0
dl 0
loc 148
ccs 62
cts 62
cp 1
rs 10
wmc 17

4 Methods

Rating   Name   Duplication   Size   Complexity  
A createExchanges() 0 26 4
A createConnections() 0 10 2
B createContainer() 0 56 7
A createQueues() 0 26 4
1
<?php
2
namespace NeedleProject\LaravelRabbitMq\Builder;
3
4
use Illuminate\Support\Collection;
5
use NeedleProject\LaravelRabbitMq\AMQPConnection;
6
use NeedleProject\LaravelRabbitMq\Container;
7
use NeedleProject\LaravelRabbitMq\Entity\ExchangeEntity;
8
use NeedleProject\LaravelRabbitMq\Entity\QueueEntity;
9
10
/**
11
 * Class ContainerBuilder
12
 *
13
 * @package NeedleProject\LaravelRabbitMq\Builder
14
 * @author  Adrian Tilita <[email protected]>
15
 * @todo    Add config validator
16
 */
17
class ContainerBuilder
18
{
19
    /**
20
     * Create RabbitMQ Container
21
     *
22
     * @param array $config
23
     * @return Container
24
     */
25 7
    public function createContainer(array $config)
26
    {
27 7
        $connections = $this->createConnections($config['connections']);
28 7
        $exchanges = $this->createExchanges($config['exchanges'], $connections);
29 6
        $queues = $this->createQueues($config['queues'], $connections);
30
31 5
        $container = new Container();
32
        // create publishers
33 5
        foreach ($config['publishers'] as $publisherAliasName => $publisherEntityBind) {
34 3
            if ($exchanges->has($publisherEntityBind)) {
35 1
                $entity = $exchanges->get($publisherEntityBind);
36 2
            } elseif ($queues->has($publisherEntityBind)) {
37 1
                $entity = $queues->get($publisherEntityBind);
38
            } else {
39 1
                throw new \RuntimeException(
40
                    sprintf(
41 1
                        "Cannot create publisher %s: no exchange or queue named %s defined!",
42 1
                        (string)$publisherAliasName,
43 1
                        (string)$publisherEntityBind
44
                    )
45
                );
46
            }
47
48 2
            $container->addPublisher(
49
                $publisherAliasName,
50
                $entity
51
            );
52
        }
53
54 4
        foreach ($config['consumers'] as $consumerAliasName => $consumerDetails) {
55 2
            $prefetchCount    = $consumerDetails['prefetch_count'];
56 2
            $globalPrefetch   = array_key_exists('global_prefetch', $consumerDetails)
57
                ? $consumerDetails['global_prefetch']
58 2
                : true;
59
            $messageProcessor = $consumerDetails['message_processor'];
60 1
61
            if ($queues->has($consumerDetails['queue'])) {
62 1
                /** @var QueueEntity $entity */
63
                $entity = $queues->get($consumerDetails['queue']);
64 1
            } else {
65 1
                throw new \RuntimeException(
66 1
                    sprintf(
67
                        "Cannot create consumer %s: no queue named %s defined!",
68
                        (string)$consumerAliasName,
69
                        (string)$consumerDetails['queue']
70
                    )
71 1
                );
72 1
            }
73 1
74
            $entity->setPrefetchCount($prefetchCount);
75
            $entity->setGlobalPrefetch($globalPrefetch);
76 3
            $entity->setMessageProcessor($messageProcessor);
77
            $container->addConsumer($consumerAliasName, $entity);
78
        }
79
80
        return $container;
81
    }
82
83
    /**
84
     * Create connections
85 7
     *
86
     * @param array $connectionConfig
87 7
     * @return Collection
88 7
     */
89 7
    private function createConnections(array $connectionConfig): Collection
90
    {
91 7
        $connections = new Collection();
92
        foreach ($connectionConfig as $connectionAliasName => $connectionCredentials) {
93
            $connections->put(
94 7
                $connectionAliasName,
95
                AMQPConnection::createConnection($connectionAliasName, $connectionCredentials)
96
            );
97
        }
98
        return $connections;
99
    }
100
101
    /**
102 7
     * @param array $exchangeConfigList
103
     * @param Collection $connections
104 7
     * @return Collection
105 7
     */
106
    private function createExchanges(array $exchangeConfigList, Collection $connections): Collection
107 3
    {
108 3
        $exchanges = new Collection();
109 1
        foreach ($exchangeConfigList as $exchangeAliasName => $exchangeDetails) {
110
            // verify if the connection exists
111 1
            if (array_key_exists('connection', $exchangeDetails) &&
112 1
                false === $connections->has($exchangeDetails['connection'])) {
113 1
                throw new \RuntimeException(
114
                    sprintf(
115
                        "Could not create exchange %s: connection name %s is not defined!",
116
                        (string)$exchangeAliasName,
117
                        (string)$exchangeDetails['connection']
118 2
                    )
119
                );
120 2
            }
121 2
122
            $exchanges->put(
123 2
                $exchangeAliasName,
124
                ExchangeEntity::createExchange(
125
                    $connections->get($exchangeDetails['connection']),
126
                    $exchangeAliasName,
127 6
                    array_merge($exchangeDetails['attributes'], ['name' => $exchangeDetails['name']])
128
                )
129
            );
130
        }
131
        return $exchanges;
132
    }
133
134
    /**
135 6
     * @param array $queueConfigList
136
     * @param Collection $connections
137 6
     * @return Collection
138 6
     */
139
    private function createQueues(array $queueConfigList, Collection $connections): Collection
140 4
    {
141 4
        $queue = new Collection();
142 1
        foreach ($queueConfigList as $queueAliasName => $queueDetails) {
143
            // verify if the connection exists
144 1
            if (array_key_exists('connection', $queueDetails) &&
145 1
                false === $connections->has($queueDetails['connection'])) {
146 1
                throw new \RuntimeException(
147
                    sprintf(
148
                        "Could not create exchange %s: connection name %s is not defined!",
149
                        (string)$queueAliasName,
150
                        (string)$queueDetails['connection']
151 3
                    )
152
                );
153 3
            }
154 3
155
            $queue->put(
156 3
                $queueAliasName,
157
                QueueEntity::createQueue(
158
                    $connections->get($queueDetails['connection']),
159
                    $queueAliasName,
160 5
                    array_merge($queueDetails['attributes'], ['name' => $queueDetails['name']])
161
                )
162
            );
163
        }
164
        return $queue;
165
    }
166
}
167