ContainerBuilder::createQueues()   A
last analyzed

Complexity

Conditions 4
Paths 3

Size

Total Lines 26
Code Lines 16

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 4
eloc 16
nc 3
nop 2
dl 0
loc 26
rs 9.7333
c 0
b 0
f 0
1
<?php
2
namespace NeedleProject\LaravelRabbitMq\Builder;
3
4
use Illuminate\Support\Collection;
5
use NeedleProject\LaravelRabbitMq\AMQPConnection;
6
use NeedleProject\LaravelRabbitMq\Container;
7
use NeedleProject\LaravelRabbitMq\Entity\ExchangeEntity;
8
use NeedleProject\LaravelRabbitMq\Entity\QueueEntity;
9
10
/**
11
 * Class ContainerBuilder
12
 *
13
 * @package NeedleProject\LaravelRabbitMq\Builder
14
 * @author  Adrian Tilita <[email protected]>
15
 * @todo    Add config validator
16
 */
17
class ContainerBuilder
18
{
19
    /**
20
     * Create RabbitMQ Container
21
     *
22
     * @param array $config
23
     * @return Container
24
     */
25
    public function createContainer(array $config)
26
    {
27
        $connections = $this->createConnections($config['connections']);
28
        $exchanges = $this->createExchanges($config['exchanges'], $connections);
29
        $queues = $this->createQueues($config['queues'], $connections);
30
31
        $container = new Container();
32
        // create publishers
33
        foreach ($config['publishers'] as $publisherAliasName => $publisherEntityBind) {
34
            if ($exchanges->has($publisherEntityBind)) {
35
                $entity = $exchanges->get($publisherEntityBind);
36
            } elseif ($queues->has($publisherEntityBind)) {
37
                $entity = $queues->get($publisherEntityBind);
38
            } else {
39
                throw new \RuntimeException(
40
                    sprintf(
41
                        "Cannot create publisher %s: no exchange or queue named %s defined!",
42
                        (string)$publisherAliasName,
43
                        (string)$publisherEntityBind
44
                    )
45
                );
46
            }
47
48
            $container->addPublisher(
49
                $publisherAliasName,
50
                $entity
51
            );
52
        }
53
54
        foreach ($config['consumers'] as $consumerAliasName => $consumerDetails) {
55
            $prefetchCount    = $consumerDetails['prefetch_count'];
56
            $globalPrefetch   = array_key_exists('global_prefetch', $consumerDetails)
57
                ? $consumerDetails['global_prefetch']
58
                : true;
59
            $messageProcessor = $consumerDetails['message_processor'];
60
61
            if ($queues->has($consumerDetails['queue'])) {
62
                /** @var QueueEntity $entity */
63
                $entity = $queues->get($consumerDetails['queue']);
64
            } else {
65
                throw new \RuntimeException(
66
                    sprintf(
67
                        "Cannot create consumer %s: no queue named %s defined!",
68
                        (string)$consumerAliasName,
69
                        (string)$consumerDetails['queue']
70
                    )
71
                );
72
            }
73
74
            $entity->setPrefetchCount($prefetchCount);
75
            $entity->setGlobalPrefetch($globalPrefetch);
76
            $entity->setMessageProcessor($messageProcessor);
77
            $container->addConsumer($consumerAliasName, $entity);
78
        }
79
80
        return $container;
81
    }
82
83
    /**
84
     * Create connections
85
     *
86
     * @param array $connectionConfig
87
     * @return Collection
88
     */
89
    private function createConnections(array $connectionConfig): Collection
90
    {
91
        $connections = new Collection();
92
        foreach ($connectionConfig as $connectionAliasName => $connectionCredentials) {
93
            $connections->put(
94
                $connectionAliasName,
95
                AMQPConnection::createConnection($connectionAliasName, $connectionCredentials)
96
            );
97
        }
98
        return $connections;
99
    }
100
101
    /**
102
     * @param array $exchangeConfigList
103
     * @param Collection $connections
104
     * @return Collection
105
     */
106
    private function createExchanges(array $exchangeConfigList, Collection $connections): Collection
107
    {
108
        $exchanges = new Collection();
109
        foreach ($exchangeConfigList as $exchangeAliasName => $exchangeDetails) {
110
            // verify if the connection exists
111
            if (array_key_exists('connection', $exchangeDetails) &&
112
                false === $connections->has($exchangeDetails['connection'])) {
113
                throw new \RuntimeException(
114
                    sprintf(
115
                        "Could not create exchange %s: connection name %s is not defined!",
116
                        (string)$exchangeAliasName,
117
                        (string)$exchangeDetails['connection']
118
                    )
119
                );
120
            }
121
122
            $exchanges->put(
123
                $exchangeAliasName,
124
                ExchangeEntity::createExchange(
125
                    $connections->get($exchangeDetails['connection']),
126
                    $exchangeAliasName,
127
                    array_merge($exchangeDetails['attributes'], ['name' => $exchangeDetails['name']])
128
                )
129
            );
130
        }
131
        return $exchanges;
132
    }
133
134
    /**
135
     * @param array $queueConfigList
136
     * @param Collection $connections
137
     * @return Collection
138
     */
139
    private function createQueues(array $queueConfigList, Collection $connections): Collection
140
    {
141
        $queue = new Collection();
142
        foreach ($queueConfigList as $queueAliasName => $queueDetails) {
143
            // verify if the connection exists
144
            if (array_key_exists('connection', $queueDetails) &&
145
                false === $connections->has($queueDetails['connection'])) {
146
                throw new \RuntimeException(
147
                    sprintf(
148
                        "Could not create exchange %s: connection name %s is not defined!",
149
                        (string)$queueAliasName,
150
                        (string)$queueDetails['connection']
151
                    )
152
                );
153
            }
154
155
            $queue->put(
156
                $queueAliasName,
157
                QueueEntity::createQueue(
158
                    $connections->get($queueDetails['connection']),
159
                    $queueAliasName,
160
                    array_merge($queueDetails['attributes'], ['name' => $queueDetails['name']])
161
                )
162
            );
163
        }
164
        return $queue;
165
    }
166
}
167