Completed
Push — master ( 9186fa...c4ad81 )
by Tilita
16s queued 14s
created

ContainerBuilder::createQueues()   A

Complexity

Conditions 4
Paths 3

Size

Total Lines 26
Code Lines 16

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 13
CRAP Score 4

Importance

Changes 0
Metric Value
cc 4
eloc 16
nc 3
nop 2
dl 0
loc 26
ccs 13
cts 13
cp 1
crap 4
rs 9.7333
c 0
b 0
f 0
1
<?php
2
namespace NeedleProject\LaravelRabbitMq\Builder;
3
4
use Illuminate\Support\Collection;
5
use NeedleProject\LaravelRabbitMq\AMQPConnection;
6
use NeedleProject\LaravelRabbitMq\Container;
7
use NeedleProject\LaravelRabbitMq\Entity\ExchangeEntity;
8
use NeedleProject\LaravelRabbitMq\Entity\QueueEntity;
9
10
/**
11
 * Class ContainerBuilder
12
 *
13
 * @package NeedleProject\LaravelRabbitMq\Builder
14
 * @author  Adrian Tilita <[email protected]>
15
 * @todo    Add config validator
16
 */
17
class ContainerBuilder
18
{
19
    /**
20
     * Create RabbitMQ Container
21
     *
22
     * @param array $config
23
     * @return Container
24
     */
25 7
    public function createContainer(array $config)
26
    {
27 7
        $connections = $this->createConnections($config['connections']);
28 7
        $exchanges = $this->createExchanges($config['exchanges'], $connections);
29 6
        $queues = $this->createQueues($config['queues'], $connections);
30
31 5
        $container = new Container();
32
        // create publishers
33 5
        foreach ($config['publishers'] as $publisherAliasName => $publisherEntityBind) {
34 3
            if ($exchanges->has($publisherEntityBind)) {
35 1
                $entity = $exchanges->get($publisherEntityBind);
36 2
            } elseif ($queues->has($publisherEntityBind)) {
37 1
                $entity = $queues->get($publisherEntityBind);
38
            } else {
39 1
                throw new \RuntimeException(
40
                    sprintf(
41 1
                        "Cannot create publisher %s: no exchange or queue named %s defined!",
42 1
                        (string)$publisherAliasName,
43 1
                        (string)$publisherEntityBind
44
                    )
45
                );
46
            }
47
48 2
            $container->addPublisher(
49
                $publisherAliasName,
50
                $entity
51
            );
52
        }
53
54 4
        foreach ($config['consumers'] as $consumerAliasName => $consumerDetails) {
55 2
            $prefetchCount    = $consumerDetails['prefetch_count'];
56 2
            $globalPrefetch   = array_key_exists('global_prefetch', $consumerDetails) ? $consumerDetails['global_prefetch'] : true;
57
            $messageProcessor = $consumerDetails['message_processor'];
58 2
59
            if ($queues->has($consumerDetails['queue'])) {
60 1
                /** @var QueueEntity $entity */
61
                $entity = $queues->get($consumerDetails['queue']);
62 1
            } else {
63
                throw new \RuntimeException(
64 1
                    sprintf(
65 1
                        "Cannot create consumer %s: no queue named %s defined!",
66 1
                        (string)$consumerAliasName,
67
                        (string)$consumerDetails['queue']
68
                    )
69
                );
70
            }
71 1
72 1
            $entity->setPrefetchCount($prefetchCount);
73 1
            $entity->setGlobalPrefetch($globalPrefetch);
74
            $entity->setMessageProcessor($messageProcessor);
75
            $container->addConsumer($consumerAliasName, $entity);
76 3
        }
77
78
        return $container;
79
    }
80
81
    /**
82
     * Create connections
83
     *
84
     * @param array $connectionConfig
85 7
     * @return Collection
86
     */
87 7
    private function createConnections(array $connectionConfig): Collection
88 7
    {
89 7
        $connections = new Collection();
90
        foreach ($connectionConfig as $connectionAliasName => $connectionCredentials) {
91 7
            $connections->put(
92
                $connectionAliasName,
93
                AMQPConnection::createConnection($connectionAliasName, $connectionCredentials)
94 7
            );
95
        }
96
        return $connections;
97
    }
98
99
    /**
100
     * @param array $exchangeConfigList
101
     * @param Collection $connections
102 7
     * @return Collection
103
     */
104 7
    private function createExchanges(array $exchangeConfigList, Collection $connections): Collection
105 7
    {
106
        $exchanges = new Collection();
107 3
        foreach ($exchangeConfigList as $exchangeAliasName => $exchangeDetails) {
108 3
            // verify if the connection exists
109 1
            if (array_key_exists('connection', $exchangeDetails) &&
110
                false === $connections->has($exchangeDetails['connection'])) {
111 1
                throw new \RuntimeException(
112 1
                    sprintf(
113 1
                        "Could not create exchange %s: connection name %s is not defined!",
114
                        (string)$exchangeAliasName,
115
                        (string)$exchangeDetails['connection']
116
                    )
117
                );
118 2
            }
119
120 2
            $exchanges->put(
121 2
                $exchangeAliasName,
122
                ExchangeEntity::createExchange(
123 2
                    $connections->get($exchangeDetails['connection']),
124
                    $exchangeAliasName,
125
                    array_merge($exchangeDetails['attributes'], ['name' => $exchangeDetails['name']])
126
                )
127 6
            );
128
        }
129
        return $exchanges;
130
    }
131
132
    /**
133
     * @param array $queueConfigList
134
     * @param Collection $connections
135 6
     * @return Collection
136
     */
137 6
    private function createQueues(array $queueConfigList, Collection $connections): Collection
138 6
    {
139
        $queue = new Collection();
140 4
        foreach ($queueConfigList as $queueAliasName => $queueDetails) {
141 4
            // verify if the connection exists
142 1
            if (array_key_exists('connection', $queueDetails) &&
143
                false === $connections->has($queueDetails['connection'])) {
144 1
                throw new \RuntimeException(
145 1
                    sprintf(
146 1
                        "Could not create exchange %s: connection name %s is not defined!",
147
                        (string)$queueAliasName,
148
                        (string)$queueDetails['connection']
149
                    )
150
                );
151 3
            }
152
153 3
            $queue->put(
154 3
                $queueAliasName,
155
                QueueEntity::createQueue(
156 3
                    $connections->get($queueDetails['connection']),
157
                    $queueAliasName,
158
                    array_merge($queueDetails['attributes'], ['name' => $queueDetails['name']])
159
                )
160 5
            );
161
        }
162
        return $queue;
163
    }
164
}
165