Completed
Pull Request — master (#61)
by Samuel
02:18
created

ToleranceExtension::createMetricPublisher()   B

Complexity

Conditions 4
Paths 4

Size

Total Lines 30
Code Lines 17

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 30
rs 8.5806
c 0
b 0
f 0
cc 4
eloc 17
nc 4
nop 3
1
<?php
2
3
/*
4
 * This file is part of the Tolerance package.
5
 *
6
 * (c) Samuel ROZE <[email protected]>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 */
11
12
namespace Tolerance\Bridge\Symfony\Bundle\ToleranceBundle\DependencyInjection;
13
14
use GuzzleHttp\Client;
15
use Symfony\Component\Config\FileLocator;
16
use Symfony\Component\Config\Loader\LoaderInterface;
17
use Symfony\Component\DependencyInjection\ContainerBuilder;
18
use Symfony\Component\DependencyInjection\Definition;
19
use Symfony\Component\DependencyInjection\Extension\PrependExtensionInterface;
20
use Symfony\Component\DependencyInjection\Parameter;
21
use Symfony\Component\DependencyInjection\Reference;
22
use Symfony\Component\HttpKernel\DependencyInjection\Extension;
23
use Symfony\Component\DependencyInjection\Loader;
24
use Tolerance\Bridge\RabbitMqBundle\MessageProfile\StoreMessageProfileConsumer;
25
use Tolerance\Bridge\RabbitMqBundle\MessageProfile\StoreMessageProfileProducer;
26
use Tolerance\Bridge\Symfony\Metrics\EventListener\RequestEnded\SendRequestTimeToCollectors;
27
use Tolerance\Bridge\Symfony\Metrics\EventListener\RequestEnded\SendRequestTimeToPublisher;
28
use Tolerance\Bridge\Symfony\Metrics\Request\StaticRequestMetricNamespaceResolver;
29
use Tolerance\MessageProfile\Storage\ElasticaStorage;
30
use Tolerance\MessageProfile\Storage\Neo4jStorage;
31
use Tolerance\Metrics\Collector\NamespacedCollector;
32
use Tolerance\Metrics\Collector\RabbitMq\RabbitMqCollector;
33
use Tolerance\Metrics\Collector\RabbitMq\RabbitMqHttpClient;
34
use Tolerance\Metrics\Publisher\BeberleiMetricsAdapterPublisher;
35
use Tolerance\Metrics\Publisher\HostedGraphitePublisher;
36
use Tolerance\Metrics\Publisher\LoggerPublisher;
37
use Tolerance\Operation\Runner\CallbackOperationRunner;
38
use Tolerance\Operation\Runner\RetryOperationRunner;
39
use Tolerance\Waiter\ExponentialBackOff;
40
use Tolerance\Waiter\CountLimited;
41
use Tolerance\Waiter\NullWaiter;
42
use Tolerance\Waiter\SleepWaiter;
43
44
class ToleranceExtension extends Extension implements PrependExtensionInterface
45
{
46
    /**
47
     * {@inheritdoc}
48
     */
49
    public function prepend(ContainerBuilder $container)
50
    {
51
        $configs = $container->getExtensionConfig($this->getAlias());
52
        $config = $this->processConfiguration(new Configuration(), $configs);
53
        if (!$config['message_profile']['enabled'] || !$config['message_profile']['integrations']['jms_serializer']) {
54
            return;
55
        }
56
57
        $bundles = $container->getParameter('kernel.bundles');
58
        if (array_key_exists('JMSSerializerBundle', $bundles)) {
59
            $container->prependExtensionConfig('jms_serializer', [
60
                'metadata' => [
61
                    'directories' => [
62
                        'ToleranceMessageProfile' => [
63
                            'namespace_prefix' => 'Tolerance\\MessageProfile\\',
64
                            'path' => '%kernel.root_dir%/../vendor/tolerance/tolerance/src/Tolerance/Bridge/JMSSerializer/MessageProfile/Resources/config',
65
                        ],
66
                    ],
67
                ],
68
            ]);
69
        }
70
    }
71
72
    /**
73
     * {@inheritdoc}
74
     */
75
    public function load(array $configs, ContainerBuilder $container)
76
    {
77
        $configuration = new Configuration();
78
        $config = $this->processConfiguration($configuration, $configs);
79
80
        $container->setParameter('tolerance.aop.enabled', $config['aop']);
81
82
        $loader = new Loader\XmlFileLoader($container, new FileLocator(__DIR__.'/../Resources/config'));
83
        $loader->load('waiter.xml');
84
85
        if ($config['aop']) {
86
            $this->loadAop($container, $loader);
87
        }
88
89
        if ($config['operation_runner_listener']) {
90
            $loader->load('operations/listeners.xml');
91
        }
92
93
        if ($config['message_profile']['enabled']) {
94
            $this->loadMessageProfile($container, $loader, $config['message_profile']);
95
        }
96
97
        foreach ($config['operation_runners'] as $name => $operationRunner) {
98
            $name = sprintf('tolerance.operation_runners.%s', $name);
99
100
            $this->createOperationRunnerDefinition($container, $name, $operationRunner);
101
        }
102
103
        $loader->load('metrics.xml');
104
105
        $this->createMetricCollectors($container, $config['metrics']['collectors']);
106
        $this->createMetricPublishers($container, $config['metrics']['publishers']);
107
108
        // Configure the metrics command
109
        $container
110
            ->getDefinition('tolerance.metrics.command.collect_and_publish')
111
            ->replaceArgument(0, new Reference($config['metrics']['command']['collector']))
112
            ->replaceArgument(1, new Reference($config['metrics']['command']['publisher']))
113
        ;
114
115
        // Configure the request listeners
116
        if (array_key_exists('request', $config['metrics'])) {
117
            $listenerName = 'tolerance.metrics.listener.request_ended.send_time_to_publishers';
118
            $requestMetricNamespaceResolverName = $listenerName.'.request_metric_namespace_resolver';
119
            $container->setDefinition($requestMetricNamespaceResolverName, new Definition(StaticRequestMetricNamespaceResolver::class, [
120
                $config['metrics']['request']['namespace']
121
            ]));
122
123
            $container->setDefinition($listenerName,
124
                (
125
                    new Definition(SendRequestTimeToPublisher::class, [
126
                        new Reference($config['metrics']['request']['publisher']),
127
                        new Reference($requestMetricNamespaceResolverName),
128
                        new Reference('logger')
129
                    ])
130
                )->addTag('kernel.event_subscriber')
131
            );
132
        }
133
    }
134
135
    private function loadMessageProfile(ContainerBuilder $container, LoaderInterface $loader, array $config)
136
    {
137
        $container->setParameter('tolerance.message_profile.header', $config['header']);
138
        $container->setParameter('tolerance.message_profile.current_peer', $config['current_peer']);
139
140
        $loader->load('message-profile/listener.xml');
141
        $loader->load('message-profile/storage.xml');
142
        $loader->load('message-profile/guzzle.xml');
143
144
        $this->configureMessageProfileStorage($container, $loader, $config['storage']);
145
        $this->loadMessageProfileIntegrations($container, $loader, $config['integrations']);
146
    }
147
148
    private function loadMessageProfileIntegrations(ContainerBuilder $container, LoaderInterface $loader, array $config)
149
    {
150
        if ($config['monolog']) {
151
            $loader->load('message-profile/monolog.xml');
152
        }
153
154
        if ($config['rabbitmq']) {
155
            $this->decorateRabbitMqConsumersAndProducers($container, $loader);
156
        }
157
    }
158
159
    private function decorateRabbitMqConsumersAndProducers(ContainerBuilder $container, LoaderInterface $loader)
0 ignored issues
show
Unused Code introduced by
The parameter $loader is not used and could be removed.

This check looks from parameters that have been defined for a function or method, but which are not used in the method body.

Loading history...
160
    {
161 View Code Duplication
        foreach ($container->findTaggedServiceIds('old_sound_rabbit_mq.producer') as $id => $attributes) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
162
            $decoratorId = $id.'.tolerance_decorator';
163
            $decoratorDefinition = new Definition(StoreMessageProfileProducer::class, [
164
                new Reference($decoratorId.'.inner'),
165
                new Reference('tolerance.message_profile.storage'),
166
                new Reference('tolerance.message_profile.identifier.generator.uuid'),
167
                new Reference('tolerance.message_profile.peer.resolver.current'),
168
                new Parameter('tolerance.message_profile.header'),
169
            ]);
170
171
            $decoratorDefinition->setDecoratedService($id);
172
            $container->setDefinition($decoratorId, $decoratorDefinition);
173
        }
174
175 View Code Duplication
        foreach ($container->findTaggedServiceIds('old_sound_rabbit_mq.consumer') as $id => $attributes) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
176
            $decoratorId = $id.'.tolerance_decorator';
177
            $decoratorDefinition = new Definition(StoreMessageProfileConsumer::class, [
178
                new Reference($decoratorId.'.inner'),
179
                new Reference('tolerance.message_profile.storage'),
180
                new Reference('tolerance.message_profile.identifier.generator.uuid'),
181
                new Reference('tolerance.message_profile.peer.resolver.current'),
182
                new Parameter('tolerance.message_profile.header'),
183
            ]);
184
185
            $decoratorDefinition->setDecoratedService($id);
186
            $container->setDefinition($decoratorId, $decoratorDefinition);
187
        }
188
    }
189
190
    private function configureMessageProfileStorage(ContainerBuilder $container, LoaderInterface $loader, array $config)
191
    {
192
        if (array_key_exists('elastica', $config)) {
193
            $loader->load('message-profile/jms_serializer.xml');
194
195
            $storage = 'tolerance.message_profile.storage.elastica';
196
            $container->setDefinition($storage, new Definition(
197
                ElasticaStorage::class,
198
                [
199
                    new Reference('tolerance.message_profile.storage.normalizer.jms_serializer'),
200
                    new Reference($config['elastica']),
201
                ]
202
            ));
203
        } elseif (array_key_exists('neo4j', $config)) {
204
            $storage = 'tolerance.message_profile.storage.neo4j';
205
            $container->setDefinition($storage, new Definition(
206
                Neo4jStorage::class,
207
                [
208
                    new Reference($config['neo4j']),
209
                    new Reference('tolerance.message_profile.storage.profile_normalizer.simple'),
210
                ]
211
            ));
212
        } elseif (false !== $config['in_memory']) {
213
            $storage = 'tolerance.message_profile.storage.in_memory';
214
        } else {
215
            throw new \RuntimeException('Unable to configure Request Identifier storage');
216
        }
217
218
        $container->setAlias('tolerance.message_profile.storage', $storage);
219
220
        if ($config['buffered']) {
221
            $loader->load('message-profile/storage/buffered.xml');
222
        }
223
    }
224
225
    private function loadAop(ContainerBuilder $container, LoaderInterface $loader)
226
    {
227
        $bundles = $container->getParameter('kernel.bundles');
228
        if (!array_key_exists('JMSAopBundle', $bundles)) {
229
            throw new \RuntimeException('You need to add the JMSAopBundle if you want to use the AOP feature');
230
        }
231
232
        $loader->load('operations/aop.xml');
233
    }
234
235
    private function createOperationRunnerDefinition(ContainerBuilder $container, $name, array $config)
236
    {
237
        if (array_key_exists('retry', $config)) {
238
            return $this->createRetryOperationRunnerDefinition($container, $name, $config['retry']);
239
        } elseif (array_key_exists('callback', $config)) {
240
            return $this->createCallbackOperationRunnerDefinition($container, $name);
241
        }
242
243
        throw new \RuntimeException(sprintf(
244
            'No valid operation runner found in %s',
245
            implode(', ', array_keys($config))
246
        ));
247
    }
248
249
    private function createRetryOperationRunnerDefinition(ContainerBuilder $container, $name, array $config)
250
    {
251
        $decoratedRunnerName = $this->createOperationRunnerDefinition($container, $name.'.runner', $config['runner']);
252
        $waiterName = $this->createWaiterDefinition($container, $name.'.waiter', $config['waiter']);
253
254
        $container->setDefinition($name,  $this->createDefinition(RetryOperationRunner::class, [
255
            new Reference($decoratedRunnerName),
256
            new Reference($waiterName),
257
        ]));
258
259
        return $name;
260
    }
261
262
    private function createCallbackOperationRunnerDefinition(ContainerBuilder $container, $name)
263
    {
264
        $container->setDefinition($name, $this->createDefinition(CallbackOperationRunner::class));
265
266
        return $name;
267
    }
268
269
    private function createWaiterDefinition(ContainerBuilder $container, $name, array $config)
270
    {
271
        if (array_key_exists('count_limited', $config)) {
272
            return $this->createCouldLimitedWaiterDefinition($container, $name, $config['count_limited']);
273
        } elseif (array_key_exists('exponential_back_off', $config)) {
274
            return $this->createExponentialBackOffWaiterDefinition($container, $name, $config['exponential_back_off']);
275
        } elseif (array_key_exists('sleep', $config)) {
276
            return $this->createSleepWaiterDefinition($container, $name);
277
        } elseif (array_key_exists('null', $config)) {
278
            return $this->createNullWaiterDefinition($container, $name);
279
        }
280
281
        throw new \RuntimeException(sprintf(
282
            'No valid wait strategy found in %s',
283
            implode(', ', array_keys($config))
284
        ));
285
    }
286
287 View Code Duplication
    private function createCouldLimitedWaiterDefinition(ContainerBuilder $container, $name, array $config)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
288
    {
289
        $decoratedStrategyName = $this->createWaiterDefinition($container, $name.'.waiter', $config['waiter']);
290
291
        $container->setDefinition($name, new Definition(CountLimited::class, [
292
            new Reference($decoratedStrategyName),
293
            $config['count'],
294
        ]));
295
296
        return $name;
297
    }
298
299 View Code Duplication
    private function createExponentialBackOffWaiterDefinition(ContainerBuilder $container, $name, array $config)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
300
    {
301
        $decoratedWaiterName = $this->createWaiterDefinition($container, $name.'.waiter', $config['waiter']);
302
303
        $container->setDefinition($name, new Definition(ExponentialBackOff::class, [
304
            new Reference($decoratedWaiterName),
305
            $config['exponent'],
306
        ]));
307
308
        return $name;
309
    }
310
311
    private function createSleepWaiterDefinition(ContainerBuilder $container, $name)
312
    {
313
        $container->setDefinition($name, new Definition(SleepWaiter::class));
314
315
        return $name;
316
    }
317
318
    private function createNullWaiterDefinition(ContainerBuilder $container, $name)
319
    {
320
        $container->setDefinition($name, new Definition(NullWaiter::class));
321
322
        return $name;
323
    }
324
325
    private function createDefinition($className, array $arguments = [])
326
    {
327
        $definition = new Definition($className, $arguments);
328
        $definition->addTag('tolerance.operation_runner');
329
330
        return $definition;
331
    }
332
333
    private function createMetricCollectors(ContainerBuilder $container, array $collectors)
334
    {
335
        foreach ($collectors as $name => $collector) {
336
            $this->createMetricCollector($container, $name, $collector)->addTag('tolerance.metrics.collector');
337
        }
338
    }
339
340
    private function createMetricCollector(ContainerBuilder $container, $name, array $collector)
341
    {
342
        $serviceName = 'tolerance.metrics.collector.'.$name;
343
        if ($collector['type'] == 'rabbitmq') {
344
            $options = $collector['options'];
345
346
            $httpClientServiceName = $serviceName.'.http_client';
347
            $container->setDefinition($httpClientServiceName, new Definition(Client::class));
348
349
            $clientServiceName = $serviceName.'.client';
350
            $container->setDefinition($clientServiceName, new Definition(RabbitMqHttpClient::class, [
351
                new Reference($httpClientServiceName),
352
                $options['host'],
353
                $options['port'],
354
                $options['username'],
355
                $options['password'],
356
            ]));
357
358
            $definition = $container->setDefinition($serviceName, new Definition(RabbitMqCollector::class, [
359
                new Reference($clientServiceName),
360
                $options['vhost'],
361
                $options['queue'],
362
            ]));
363
        } else {
364
            throw new \InvalidArgumentException(sprintf('Type "%s" not supported', $collector['type']));
365
        }
366
367
        if ($collector['namespace']) {
368
            $container->setDefinition($serviceName.'.inner', $container->getDefinition($serviceName));
369
            $definition = $container->setDefinition($serviceName, new Definition(NamespacedCollector::class, [
370
                new Reference($serviceName.'.inner'),
371
                $collector['namespace'],
372
            ]));
373
        }
374
375
        return $definition;
376
    }
377
378
    private function createMetricPublishers(ContainerBuilder $container, array $publishers)
379
    {
380
        foreach ($publishers as $name => $publisher) {
381
            $this->createMetricPublisher($container, $name, $publisher)->addTag('tolerance.metrics.publisher');
382
        }
383
    }
384
385
    private function createMetricPublisher(ContainerBuilder $container, $name, array $publisher)
386
    {
387
        $serviceName = 'tolerance.metrics.publisher.'.$name;
388
389
        if ('logger' == $publisher['type']) {
390
            return $container->setDefinition($serviceName, new Definition(LoggerPublisher::class, [
391
                new Reference('logger'),
392
            ]));
393
        }
394
395
        if ('hosted_graphite' == $publisher['type']) {
396
            return $container->setDefinition($serviceName, new Definition(HostedGraphitePublisher::class, [
397
                $publisher['options']['server'],
398
                $publisher['options']['port'],
399
                $publisher['options']['api_key'],
400
            ]));
401
        }
402
403
        if ('beberlei' == $publisher['type']) {
404
            return $container->setDefinition($serviceName, new Definition(BeberleiMetricsAdapterPublisher::class, [
405
                new Reference($publisher['options']['service']),
406
                $publisher['options']['auto_flush']
407
            ]));
408
        }
409
410
        throw new \RuntimeException(sprintf(
411
            'Publisher "%s" not supported',
412
            $publisher['type']
413
        ));
414
    }
415
}
416