ContainerBuilder::createConsumers()   B
last analyzed

Complexity

Conditions 9
Paths 10

Size

Total Lines 64
Code Lines 36

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
eloc 36
c 1
b 0
f 0
dl 0
loc 64
rs 8.0555
cc 9
nc 10
nop 2

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
declare(strict_types=1);
4
5
namespace BinaryCube\CarrotMQ\Builder;
6
7
use Psr\Log\LoggerInterface;
8
use BinaryCube\CarrotMQ\Config;
9
use BinaryCube\CarrotMQ\Consumer;
10
use BinaryCube\CarrotMQ\Publisher;
11
use BinaryCube\CarrotMQ\Component;
12
use BinaryCube\CarrotMQ\Container;
13
use BinaryCube\CarrotMQ\Connection;
14
use BinaryCube\CarrotMQ\Entity\Queue;
15
use BinaryCube\CarrotMQ\Entity\Topic;
16
use BinaryCube\CarrotMQ\Support\Collection;
17
use BinaryCube\CarrotMQ\Processor\Processor;
18
use BinaryCube\CarrotMQ\Processor\CallbackProcessor;
19
20
use function vsprintf;
21
use function is_string;
22
use function get_class;
23
use function is_callable;
24
use function class_exists;
25
26
/**
27
 * Class ContainerBuilder
28
 */
29
class ContainerBuilder extends Component
30
{
31
32
    /**
33
     * @const array Default parameters.
34
     */
35
    const DEFAULTS = [
36
        'connections' => [],
37
        'topics'      => [],
38
        'queues'      => [],
39
        'publishers'  => [],
40
        'consumers'   => [],
41
    ];
42
43
    /**
44
     * @param Config               $config
45
     * @param LoggerInterface|null $logger
46
     *
47
     * @return Container
48
     */
49
    public static function create(Config $config, ?LoggerInterface $logger = null)
50
    {
51
        $builder = new static($logger);
52
53
        return $builder->build($config);
54
    }
55
56
    /**
57
     * Constructor.
58
     *
59
     * @param LoggerInterface|null $logger
60
     */
61
    public function __construct(?LoggerInterface $logger = null)
62
    {
63
        parent::__construct(null, $logger);
64
    }
65
66
    /**
67
     * @param Config $config
68
     *
69
     * @return Container
70
     */
71
    public function build(Config $config): Container
72
    {
73
        $config = Collection::make(static::DEFAULTS)->merge($config->all())->all();
74
75
        $container = new Container();
76
77
        $this
78
            ->createConnections($container, $config)
79
            ->createTopics($container, $config)
80
            ->createQueues($container, $config)
81
            ->createPublishers($container, $config)
82
            ->createConsumers($container, $config);
83
84
        return $container;
85
    }
86
87
    /**
88
     * @param Container $container
89
     * @param array     $config
90
     *
91
     * @return $this
92
     */
93
    protected function createConnections(Container $container, array $config)
94
    {
95
        $connections = $config['connections'];
96
97
        foreach ($connections as $id => $connection) {
98
            $entry = new Connection($id, $connection, $this->logger);
99
100
            $container->connections()->put($entry->id(), $entry);
101
102
            $this->logger->debug(vsprintf('Connection with ID: "%s" has been created', [$entry->id()]));
103
        }
104
105
        return $this;
106
    }
107
108
    /**
109
     * @param Container $container
110
     * @param array     $config
111
     *
112
     * @return $this
113
     */
114
    protected function createTopics(Container $container, array $config)
115
    {
116
        $topics = $config['topics'];
117
118
        $default = [
119
            'connection' => '',
120
            'name'       => '',
121
            'config'     => [],
122
        ];
123
124
        foreach ($topics as $id => $topic) {
125
            $topic = Collection::make($default)->merge($topic)->all();
126
127
            if (empty($topic['name'])) {
128
                throw new \RuntimeException(vsprintf('Topic name is empty!', []));
129
            }
130
131
            if (
132
                empty($topic['connection']) ||
133
                ! $container->connections()->has($topic['connection'])
134
            ) {
135
                throw new \RuntimeException(
136
                    vsprintf(
137
                        'Could not create topic "%s": connection name "%s" is not defined!',
138
                        [
139
                            $topic['name'],
140
                            $topic['connection'],
141
                        ]
142
                    )
143
                );
144
            }
145
146
            $name       = $topic['name'];
147
            $connection = $container->connections()->get($topic['connection']);
148
            $params     = $topic['config'];
149
150
            $entry = new Topic($id, $name, $connection, $params, $this->logger);
151
152
            $container->topics()->put($entry->id(), $entry);
153
154
            $this->logger->debug(vsprintf('Topic with ID: "%s" has been created', [$entry->id()]));
155
        }//end foreach
156
157
        return $this;
158
    }
159
160
    /**
161
     * @param Container $container
162
     * @param array     $config
163
     *
164
     * @return $this
165
     */
166
    protected function createQueues(Container $container, array $config)
167
    {
168
        $queues = $config['queues'];
169
170
        $default = [
171
            'connection' => '',
172
            'name'       => '',
173
            'config'     => [],
174
        ];
175
176
        foreach ($queues as $id => $queue) {
177
            $queue = Collection::make($default)->merge($queue)->all();
178
179
            if (empty($queue['name'])) {
180
                throw new \RuntimeException(vsprintf('Queue name is empty!', []));
181
            }
182
183
            if (
184
                empty($queue['connection']) ||
185
                ! $container->connections()->has($queue['connection'])
186
            ) {
187
                throw new \RuntimeException(
188
                    vsprintf(
189
                        'Could not create queue "%s": connection name "%s" is not defined!',
190
                        [
191
                            $queue['name'],
192
                            $queue['connection'],
193
                        ]
194
                    )
195
                );
196
            }
197
198
            $name       = $queue['name'];
199
            $connection = $container->connections()->get($queue['connection']);
200
            $params     = $queue['config'];
201
202
            $entry = new Queue($id, $name, $connection, $params, $this->logger);
203
204
            $container->queues()->put($entry->id(), $entry);
205
206
            $this->logger->debug(vsprintf('Queue with ID: "%s" has been created', [$entry->id()]));
207
        }//end foreach
208
209
        return $this;
210
    }
211
212
    /**
213
     * @param Container $container
214
     * @param array     $config
215
     *
216
     * @return $this
217
     */
218
    protected function createPublishers(Container $container, array $config)
219
    {
220
        $publishers = $config['publishers'];
221
222
        $default = [
223
            'topic'  => '',
224
            'config' => [],
225
        ];
226
227
        foreach ($publishers as $id => $publisher) {
228
            $publisher = Collection::make($default)->merge($publisher)->all();
229
230
            if (
231
                empty($publisher['topic']) ||
232
                ! $container->topics()->has($publisher['topic'])
233
            ) {
234
                throw new \RuntimeException(
235
                    vsprintf(
236
                        'Could not create publisher "%s": topic id "%s" is not defined!',
237
                        [
238
                            $id,
239
                            $publisher['topic'],
240
                        ]
241
                    )
242
                );
243
            }
244
245
            $topic  = $container->topics()->get($publisher['topic']);
246
            $params = $publisher['config'];
247
248
            $entry = new Publisher($id, $topic, $container, $params, $this->logger);
249
250
            $container->publishers()->put($entry->id(), $entry);
251
252
            $this->logger->debug(vsprintf('Publisher with ID: "%s" has been created', [$entry->id()]));
253
        }//end foreach
254
255
        return $this;
256
    }
257
258
    /**
259
     * @param Container $container
260
     * @param array     $config
261
     *
262
     * @return $this
263
     */
264
    protected function createConsumers(Container $container, array $config)
265
    {
266
        $consumers = $config['consumers'];
267
268
        $default = [
269
            'queue'      => '',
270
            'processor'  => null,
271
            'config'     => [],
272
        ];
273
274
        foreach ($consumers as $id => $consumer) {
275
            $consumer = Collection::make($default)->merge($consumer)->all();
276
277
            if (
278
                empty($consumer['queue']) ||
279
                ! $container->queues()->has($consumer['queue'])
280
            ) {
281
                throw new \RuntimeException(
282
                    vsprintf(
283
                        'Could not create consumer "%s": queue id "%s" is not defined!',
284
                        [
285
                            $id,
286
                            $consumer['queue'],
287
                        ]
288
                    )
289
                );
290
            }
291
292
            $queue     = $container->queues()->get($consumer['queue']);
293
            $processor = $consumer['processor'];
294
            $params    = $consumer['config'];
295
296
            if (empty($processor)) {
297
                $processor = new CallbackProcessor(
298
                    function () {
299
                        return false;
300
                    }
301
                );
302
            } elseif (is_callable($processor)) {
303
                $processor = new CallbackProcessor($processor);
304
            } elseif (is_string($processor) && class_exists($processor)) {
305
                $processor = new $processor();
306
            }
307
308
            if (! ($processor instanceof Processor)) {
309
                throw new \LogicException(
310
                    vsprintf(
311
                        "Can't create processor, '%s' must extend from %s or its child class.",
312
                        [
313
                            get_class($processor),
314
                            Processor::class,
315
                        ]
316
                    )
317
                );
318
            }
319
320
            $entry = new Consumer($id, $queue, $processor, $container, $params, $this->logger);
321
322
            $container->consumers()->put($entry->id(), $entry);
323
324
            $this->logger->debug(vsprintf('Consumer with ID: "%s" has been created', [$entry->id()]));
325
        }//end foreach
326
327
        return $this;
328
    }
329
330
}
331