Configuration   A
last analyzed

Complexity

Total Complexity 18

Size/Duplication

Total Lines 441
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 10

Test Coverage

Coverage 100%

Importance

Changes 0
Metric Value
wmc 18
lcom 1
cbo 10
dl 0
loc 441
ccs 219
cts 219
cp 1
rs 10
c 0
b 0
f 0

6 Methods

Rating   Name   Duplication   Size   Complexity  
A getConfigTreeBuilder() 0 51 1
B addConnectionsSection() 0 97 4
B addPublishersSection() 0 36 2
B addConsumersSection() 0 81 3
A addExchangeSection() 0 67 3
B addQueueSection() 0 81 5
1
<?php
2
3
namespace TreeHouse\QueueBundle\DependencyInjection;
4
5
use Symfony\Component\Config\Definition\Exception\InvalidConfigurationException;
6
use TreeHouse\Queue\Amqp\ExchangeInterface as Exchg;
7
use Symfony\Component\Config\Definition\Builder\ArrayNodeDefinition;
8
use Symfony\Component\Config\Definition\Builder\TreeBuilder;
9
use Symfony\Component\Config\Definition\ConfigurationInterface;
10
11
class Configuration implements ConfigurationInterface
12
{
13
    /**
14
     * @inheritdoc
15
     */
16 33
    public function getConfigTreeBuilder()
17
    {
18 33
        $treeBuilder = new TreeBuilder();
19 33
        $rootNode = $treeBuilder->root('tree_house_queue');
20 33
        $children = $rootNode->children();
21
22
        $children
23 33
            ->enumNode('driver')
24 33
            ->values(['amqp', 'php-amqplib'])
25 33
            ->defaultValue('amqp')
26 33
            ->validate()
27
            ->ifTrue(function ($value) {
28 1
                return $value === 'php-amqplib';
29 33
            })
30 33
            ->thenInvalid('Driver for php-amqplib is not yet implemented')
31
        ;
32
33
        $children
34 33
            ->booleanNode('auto_flush')
35 33
            ->defaultTrue()
36 33
            ->info('Whether to automatically flush the Doctrine object manager when processing messages')
37
        ;
38
39 33
        $this->addConnectionsSection($rootNode);
40 33
        $this->addPublishersSection($rootNode);
41 33
        $this->addConsumersSection($rootNode);
42
43
        /** @var ArrayNodeDefinition $exchanges */
44
        $exchanges = $rootNode
45 33
            ->fixXmlConfig('exchange')
46 33
            ->children()
47 33
            ->arrayNode('exchanges')
48 33
            ->useAttributeAsKey('name')
49 33
            ->prototype('array')
50
        ;
51
52 33
        $this->addExchangeSection($exchanges);
53
54
        /** @var ArrayNodeDefinition $queues */
55
        $queues = $rootNode
56 33
            ->fixXmlConfig('queue')
57 33
            ->children()
58 33
            ->arrayNode('queues')
59 33
            ->useAttributeAsKey('name')
60 33
            ->prototype('array')
61
        ;
62
63 33
        $this->addQueueSection($queues);
64
65 33
        return $treeBuilder;
66
    }
67
68
    /**
69
     * @param ArrayNodeDefinition $rootNode
70
     */
71 33
    private function addConnectionsSection(ArrayNodeDefinition $rootNode)
72
    {
73 33
        $rootNode->fixXmlConfig('connection');
74 33
        $children = $rootNode->children();
75
76
        $children
77 33
            ->scalarNode('default_connection')
78 33
            ->defaultNull()
79
        ;
80
81
        /** @var ArrayNodeDefinition $connections */
82
        $connections = $children
83 33
            ->arrayNode('connections')
84 33
            ->isRequired()
85 33
            ->requiresAtLeastOneElement()
86 33
            ->cannotBeEmpty()
87 33
            ->useAttributeAsKey('name')
88 33
            ->info('List of connections. The key becomes the connection name.')
89 33
            ->example(<<<EOF
90
# long
91
connections:
92
  conn1:
93
    host: localhost
94
  conn2:
95
    host: otherhost
96
97
# short
98
connection:
99
  host: rabbitmqhost
100
  port: 1234
101
102
# shorter
103
connection: rabbitmqhost
104
105
# mixing it:
106
connections:
107
  conn1: localhost
108
  conn2:
109
    host: otherhost
110
    port: 1234
111
112
# numeric name index
113
connections:
114
  -
115
    host: localhost
116
117
# named index
118
connections:
119
  -
120
    name: conn1
121 33
    host: localhost
122
EOF
123
            )
124
        ;
125
126
        $connections
127 33
            ->beforeNormalization()
128 33
            ->ifArray()
129
            ->then(function ($value) {
130 33
                $conns = [];
131 33
                foreach ($value as $key => $conn) {
132
                    // string becomes the host
133 33
                    if (is_string($conn)) {
134
                        $conn = [
135 2
                            'host' => $conn,
136
                        ];
137
                    }
138
139 33
                    if (!isset($conn['name'])) {
140 33
                        $conn['name'] = $key;
141
                    }
142
143 33
                    $conns[$key] = $conn;
144
                }
145
146 33
                return $conns;
147 33
            })
148
        ;
149
150
        /** @var ArrayNodeDefinition $prototype */
151 33
        $prototype = $connections->prototype('array');
152 33
        $prototype->addDefaultsIfNotSet();
153
154 33
        $connection = $prototype->children();
155 33
        $connection->scalarNode('name');
156 33
        $connection->scalarNode('host');
157 33
        $connection->integerNode('port')->defaultValue(5672);
158 33
        $connection->scalarNode('user')->defaultValue('guest');
159 33
        $connection->scalarNode('pass')->defaultValue('guest');
160 33
        $connection->scalarNode('vhost')->defaultValue('/');
161
162 33
        $params = $connection->arrayNode('params');
163 33
        $params->prototype('scalar');
164 33
        $params->defaultValue([
165
            'heartbeat' => 60
166 33
        ]);
167 33
    }
168
169
    /**
170
     * @param ArrayNodeDefinition $rootNode
171
     */
172 33
    private function addPublishersSection(ArrayNodeDefinition $rootNode)
173
    {
174 33
        $rootNode->fixXmlConfig('publisher');
175 33
        $children = $rootNode->children();
176
177
        /** @var ArrayNodeDefinition $publishers */
178 33
        $publishers = $children->arrayNode('publishers')->useAttributeAsKey('name')->prototype('array');
179 33
        $publishers->addDefaultsIfNotSet();
180
181 33
        $publisher = $publishers->children();
182 33
        $publisher->scalarNode('name');
183
        $publisher
184 33
            ->scalarNode('serializer')
185 33
            ->defaultValue('@tree_house.queue.serializer.php')
186 33
            ->beforeNormalization()
187 33
                ->ifInArray(['php', 'json', 'doctrine'])
188
                ->then(function ($value) {
189 1
                    return sprintf('@tree_house.queue.serializer.%s', $value);
190 33
                })
191 33
            ->end()
192 33
            ->validate()
193
                ->ifTrue(function ($value) {
194 3
                    substr($value, 0, 1) !== '@' && !class_exists($value);
195 33
                })
196 33
                ->thenInvalid('Serializer class "%s" does not exist')
197 33
            ->end()
198
        ;
199
200
        $publisher
201 33
            ->scalarNode('composer')
202 33
            ->defaultValue('%tree_house.queue.composer.default.class%')
203
        ;
204
205 33
        $exchange = $publisher->arrayNode('exchange');
206 33
        $this->addExchangeSection($exchange);
207 33
    }
208
209
    /**
210
     * @param ArrayNodeDefinition $rootNode
211
     */
212 33
    private function addConsumersSection(ArrayNodeDefinition $rootNode)
213
    {
214 33
        $rootNode->fixXmlConfig('consumer');
215 33
        $children = $rootNode->children();
216
217
        /** @var ArrayNodeDefinition $consumers */
218 33
        $consumers = $children->arrayNode('consumers')->useAttributeAsKey('name')->prototype('array');
219 33
        $consumers->addDefaultsIfNotSet();
220
221 33
        $consumer = $consumers->children();
222 33
        $consumer->scalarNode('name');
223
224
        // processor
225
        $consumer
226 33
            ->scalarNode('processor')
227 33
            ->validate()
228
            ->ifTrue(function ($value) {
229 10
                substr($value, 0, 1) !== '@' && !class_exists($value);
230 33
            })
231 33
            ->thenInvalid('Processor class "%s" does not exist')
232
        ;
233
234
        // queue
235 33
        $queue = $consumer->arrayNode('queue');
236 33
        $this->addQueueSection($queue);
237
238
        // retry
239 33
        $retry = $consumer->arrayNode('retry');
240
        $retry
241 33
            ->addDefaultsIfNotSet()
242 33
            ->beforeNormalization()
243
            ->ifTrue(function ($value) {
244 6
                return is_scalar($value);
245 33
            })
246
            ->then(function ($attempts) {
247 2
                if (!is_numeric($attempts)) {
248
                    throw new InvalidConfigurationException('When using a scalar for "retry", it must be numeric, eg: "retry: 3"');
249
                }
250
251
                return [
252 2
                    'attempts' => (int) $attempts,
253
                ];
254 33
            })
255
        ;
256
257 33
        $retryConfig = $retry->children();
258
        $retryConfig
259 33
            ->integerNode('attempts')
260 33
            ->defaultValue(1)
261 33
            ->validate()
262
            ->ifTrue(function ($value) {
263 6
                return !($value > 0);
264 33
            })
265 33
            ->thenInvalid('Expecting a positive number, got "%s"')
266
        ;
267
        $retryConfig
268 33
            ->scalarNode('publisher')
269 33
            ->defaultNull()
270 33
            ->info('Name of the publisher to use for retrying messages. Defaults to the same name as the consumer')
271
        ;
272
273
        // retry strategy
274 33
        $strategy = $retryConfig->arrayNode('strategy');
275
        $strategy
276 33
            ->addDefaultsIfNotSet()
277 33
            ->beforeNormalization()
278 33
            ->ifString()
279
            ->then(function ($type) {
280
                return [
281 2
                    'type' => $type,
282
                ];
283 33
            })
284
        ;
285
286 33
        $strategyConfig = $strategy->children();
287
        $strategyConfig
288 33
            ->enumNode('type')
289 33
            ->values(['backoff', 'deprioritize'])
290 33
            ->defaultValue('backoff')
291
        ;
292 33
    }
293
294
    /**
295
     * @param ArrayNodeDefinition $node
296
     * @param bool                $includeDlx
297
     * @param bool                $includeDelay
298
     */
299 33
    private function addExchangeSection(ArrayNodeDefinition $node, $includeDlx = true, $includeDelay = true)
300
    {
301 33
        $node->addDefaultsIfNotSet();
302
        $node
303 33
            ->beforeNormalization()
304 33
            ->ifString()
305
            ->then(function ($value) {
306
                return [
307
                    'type' => $value,
308
                ];
309 33
            })
310
        ;
311
312 33
        $exchange = $node->children();
313
        $exchange
314 33
            ->scalarNode('name')
315 33
            ->defaultNull()
316 33
            ->info('The name to create the exchange with in the AMQP broker')
317
        ;
318
        $exchange
319 33
            ->enumNode('type')
320 33
            ->values([Exchg::TYPE_DIRECT, Exchg::TYPE_FANOUT, Exchg::TYPE_TOPIC, Exchg::TYPE_HEADERS])
321 33
            ->defaultValue(Exchg::TYPE_DIRECT)
322 33
            ->info('The exchange type')
323
        ;
324
325
        $exchange
326 33
            ->booleanNode('auto_declare')
327 33
            ->info('Whether to automatically declare the exchange on cache warmup. Only enable this when you have configure access to the exchange')
328
        ;
329
330 33
        $exchange->scalarNode('connection')->defaultNull();
331 33
        $exchange->booleanNode('durable')->defaultTrue();
332 33
        $exchange->booleanNode('passive')->defaultFalse();
333 33
        $exchange->booleanNode('auto_delete')->defaultFalse();
334 33
        $exchange->booleanNode('internal')->defaultFalse();
335 33
        $exchange->booleanNode('nowait')->defaultFalse();
336
        $exchange
337 33
            ->arrayNode('arguments')
338 33
            ->normalizeKeys(false)
339 33
            ->prototype('scalar')
340 33
            ->defaultValue([])
341
        ;
342
343 33
        if ($includeDelay) {
344
            $exchange
345 33
                ->booleanNode('delay')
346 33
                ->defaultTrue()
347 33
                ->info('Whether to enable delayed messages for this exchange')
348
            ;
349
        }
350
351 33
        if ($includeDlx) {
352 33
            $dlx = $exchange->arrayNode('dlx');
353 33
            $dlx->info('Create a dead letter exchange for this exchange');
354 33
            $dlx->canBeDisabled();
355
356
            // copy the entire exchange configuration here
357 33
            $this->addExchangeSection($dlx, false, false);
358
359
            $queue = $dlx
360 33
                ->children()
361 33
                ->arrayNode('queue')
362
            ;
363 33
            $this->addQueueSection($queue);
364
        }
365 33
    }
366
367
    /**
368
     * @param ArrayNodeDefinition $node
369
     */
370 33
    private function addQueueSection(ArrayNodeDefinition $node)
371
    {
372 33
        $node->addDefaultsIfNotSet();
373 33
        $node->fixXmlConfig('binding');
374
375 33
        $queue = $node->children();
376
377
        $queue
378 33
            ->booleanNode('auto_declare')
379 33
            ->info('Whether to automatically declare the queue on cache warmup. Only enable this when you have configure access to the queue')
380
        ;
381
382 33
        $queue->scalarNode('name')->defaultNull();
383 33
        $queue->scalarNode('connection')->defaultNull();
384 33
        $queue->booleanNode('durable')->defaultTrue();
385 33
        $queue->booleanNode('passive')->defaultFalse();
386 33
        $queue->booleanNode('exclusive')->defaultFalse();
387 33
        $queue->booleanNode('auto_delete')->defaultFalse();
388
        $queue
389 33
            ->scalarNode('dlx')
390 33
            ->defaultNull()
391 33
            ->info('The name of the dead letter exchange that this queue should link to')
392
        ;
393
394
        $queue
395 33
            ->arrayNode('arguments')
396 33
            ->normalizeKeys(false)
397 33
            ->prototype('scalar')
398 33
            ->defaultValue([])
399
        ;
400
401 33
        $bindings = $queue->arrayNode('bindings');
402 33
        $bindings->requiresAtLeastOneElement();
403
404
        /** @var ArrayNodeDefinition $binding */
405 33
        $binding = $bindings->prototype('array');
406
        $binding
407 33
            ->beforeNormalization()
408 33
            ->always()
409 33
            ->then(function ($binding) {
410
                // use string as exchange
411 2
                if (is_string($binding)) {
412
                    $binding = ['exchange' => $binding];
413
                }
414
415
                // if multiple routing keys are given, make a copy for each one
416 2
                if (!isset($binding['routing_keys'])) {
417 2
                    $binding['routing_keys'] = isset($binding['routing_key']) ? $binding['routing_key'] : [];
418 2
                    unset($binding['routing_key']);
419
                }
420
421 2
                if (is_scalar($binding['routing_keys'])) {
422 2
                    $binding['routing_keys'] = [$binding['routing_keys']];
423
                }
424
425 2
                return $binding;
426 33
            })
427
        ;
428
429
        $binding
430 33
            ->addDefaultsIfNotSet()
431 33
            ->fixXmlConfig('routing_key')
432
        ;
433
434 33
        $bindingOptions = $binding->children();
435
        $bindingOptions
436 33
            ->scalarNode('exchange')
437 33
            ->isRequired()
438
        ;
439
        $bindingOptions
440 33
            ->arrayNode('routing_keys')
441 33
            ->prototype('scalar')
442 33
            ->defaultValue([])
443
        ;
444
        $bindingOptions
445 33
            ->arrayNode('arguments')
446 33
            ->normalizeKeys(false)
447 33
            ->prototype('scalar')
448 33
            ->defaultValue([])
449
        ;
450 33
    }
451
}
452