Completed
Push — master ( 745336...aadd5d )
by Matthew
06:17
created

WorkerCompilerPass::setupRabbitMQOptions()   B

Complexity

Conditions 6
Paths 6

Size

Total Lines 20
Code Lines 14

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 42

Importance

Changes 0
Metric Value
dl 0
loc 20
ccs 0
cts 20
cp 0
rs 8.8571
c 0
b 0
f 0
cc 6
eloc 14
nc 6
nop 3
crap 42
1
<?php
2
3
namespace Dtc\QueueBundle\DependencyInjection\Compiler;
4
5
use Dtc\QueueBundle\Model\Job;
6
use Dtc\QueueBundle\Model\Run;
7
use Pheanstalk\Pheanstalk;
8
use Symfony\Component\DependencyInjection\Alias;
9
use Symfony\Component\DependencyInjection\ContainerBuilder;
10
use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
11
use Symfony\Component\DependencyInjection\Definition;
12
use Symfony\Component\DependencyInjection\Reference;
13
14
class WorkerCompilerPass implements CompilerPassInterface
15
{
16
    public function process(ContainerBuilder $container)
17
    {
18
        if (false === $container->hasDefinition('dtc_queue.worker_manager')) {
19
            return;
20
        }
21
22
        $this->setupAliases($container);
23
24
        // Setup beanstalkd if configuration is present
25
        $this->setupBeanstalkd($container);
26
        $this->setupRabbitMQ($container);
27
28
        $definition = $container->getDefinition('dtc_queue.worker_manager');
29
        $jobManagerRef = array(new Reference('dtc_queue.job_manager'));
30
31
        $jobClass = $this->getJobClass($container);
32
        $jobArchiveClass = $this->getJobClassArchive($container);
33
        $container->setParameter('dtc_queue.class_job', $jobClass);
34
        $container->setParameter('dtc_queue.class_job_archive', $jobArchiveClass);
35
        $container->setParameter('dtc_queue.class_run', $this->getRunClass($container, 'run', 'Run'));
36
        $container->setParameter('dtc_queue.class_run_archive', $this->getRunClass($container, 'run_archive', 'RunArchive'));
37
38
        $this->setupTaggedServices($container, $definition, $jobManagerRef, $jobClass);
39
        $eventDispatcher = $container->getDefinition('dtc_queue.event_dispatcher');
40
        foreach ($container->findTaggedServiceIds('dtc_queue.event_subscriber') as $id => $attributes) {
41
            $eventSubscriber = $container->getDefinition($id);
42
            $eventDispatcher->addMethodCall('addSubscriber', [$eventSubscriber]);
43
        }
44
        $this->setupDoctrineManagers($container);
45
    }
46
47
    protected function setupAliases(ContainerBuilder $container)
48
    {
49
        $defaultManagerType = $container->getParameter('dtc_queue.default_manager');
50
        if (!$container->hasDefinition('dtc_queue.job_manager.'.$defaultManagerType)) {
51
            throw new \Exception("No job manager found for dtc_queue.job_manager.$defaultManagerType");
52
        }
53
54
        $defaultRunManagerType = $container->getParameter('dtc_queue.run_manager');
55
        if (!$container->hasDefinition('dtc_queue.run_manager.'.$defaultRunManagerType)) {
56
            throw new \Exception("No run manager found for dtc_queue.run_manager.$defaultRunManagerType");
57
        }
58
59
        $alias = new Alias('dtc_queue.job_manager.'.$defaultManagerType);
60
        $container->setAlias('dtc_queue.job_manager', $alias);
61
62
        $alias = new Alias('dtc_queue.run_manager.'.$defaultRunManagerType);
63
        $container->setAlias('dtc_queue._manager', $alias);
64
    }
65
66
    /**
67
     * @param ContainerBuilder $container
68
     * @param Reference[]            $jobManagerRef
69
     * @param string           $jobClass
70
     */
71
    protected function setupTaggedServices(ContainerBuilder $container, Definition $definition, array $jobManagerRef, $jobClass)
72
    {
73
        // Add each worker to workerManager, make sure each worker has instance to work
74
        foreach ($container->findTaggedServiceIds('dtc_queue.worker') as $id => $attributes) {
75
            $worker = $container->getDefinition($id);
76
            $class = $container->getDefinition($id)->getClass();
77
78
            $refClass = new \ReflectionClass($class);
79
            $workerClass = 'Dtc\QueueBundle\Model\Worker';
80
            if (!$refClass->isSubclassOf($workerClass)) {
81
                throw new \InvalidArgumentException(sprintf('Service "%s" must extend class "%s".', $id, $workerClass));
82
            }
83
84
            // Give each worker access to job manager
85
            $worker->addMethodCall('setJobManager', $jobManagerRef);
86
            $worker->addMethodCall('setJobClass', array($jobClass));
87
88
            $definition->addMethodCall('addWorker', array(new Reference($id)));
89
        }
90
    }
91
92
    /**
93
     * Sets up beanstalkd instance if appropriate.
94
     *
95
     * @param ContainerBuilder $container
96
     */
97
    protected function setupBeanstalkd(ContainerBuilder $container)
98
    {
99
        if ($container->hasParameter('dtc_queue.beanstalkd.host')) {
100
            $definition = new Definition('Pheanstalk\\Pheanstalk', [$container->getParameter('dtc_queue.beanstalkd.host')]);
101
            $container->setDefinition('dtc_queue.beanstalkd', $definition);
102
            $definition = $container->getDefinition('dtc_queue.job_manager.beanstalkd');
103
            $definition->addMethodCall('setBeanstalkd', [new Reference('dtc_queue.beanstalkd')]);
104
            if ($container->hasParameter('dtc_queue.beanstalkd.tube')) {
105
                $definition->addMethodCall('setTube', [$container->getParameter('dtc_queue.beanstalkd.tube')]);
106
            }
107
        }
108
    }
109
110
    protected function setupDoctrineManagers(ContainerBuilder $container)
111
    {
112
        $documentManager = $container->getParameter('dtc_queue.document_manager');
113
114
        $odmManager = "doctrine_mongodb.odm.{$documentManager}_document_manager";
115
        if ($container->has($odmManager)) {
116
            $container->setAlias('dtc_queue.document_manager', $odmManager);
117
        }
118
119
        $entityManager = $container->getParameter('dtc_queue.entity_manager');
120
121
        $ormManager = "doctrine.orm.{$entityManager}_entity_manager";
122
        if ($container->has($ormManager)) {
123
            $container->setAlias('dtc_queue.entity_manager', $ormManager);
124
        }
125
    }
126
127
    /**
128
     * Sets up RabbitMQ instance if appropriate.
129
     *
130
     * @param ContainerBuilder $container
131
     */
132
    protected function setupRabbitMQ(ContainerBuilder $container)
133
    {
134
        if ($container->hasParameter('dtc_queue.rabbit_mq')) {
135
            $class = 'PhpAmqpLib\\Connection\\AMQPStreamConnection';
136
            $rabbitMqConfig = $container->getParameter('dtc_queue.rabbit_mq');
137
            $arguments = [
138
                $rabbitMqConfig['host'],
139
                $rabbitMqConfig['port'],
140
                $rabbitMqConfig['user'],
141
                $rabbitMqConfig['password'],
142
                $rabbitMqConfig['vhost'],
143
            ];
144
145
            $this->setupRabbitMQOptions($container, $arguments, $class);
146
            $definition = new Definition($class, $arguments);
147
            $container->setDefinition('dtc_queue.rabbit_mq', $definition);
148
            $definition = $container->getDefinition('dtc_queue.job_manager.rabbit_mq');
149
            $definition->addMethodCall('setAMQPConnection', [new Reference('dtc_queue.rabbit_mq')]);
150
            $definition->addMethodCall('setQueueArgs', array_values($rabbitMqConfig['queue_args']));
151
            $definition->addMethodCall('setExchangeArgs', array_values($rabbitMqConfig['exchange_args']));
152
        }
153
    }
154
155
    /**
156
     * @param ContainerBuilder $container
157
     * @param array            $arguments
158
     * @param string $class
159
     */
160
    protected function setupRabbitMQOptions(ContainerBuilder $container, array &$arguments, &$class)
161
    {
162
        if ($container->hasParameter('dtc_queue.rabbit_mq.ssl') && $container->getParameter('dtc_queue.rabbit_mq.ssl')) {
163
            $class = 'PhpAmqpLib\\Connection\\AMQPSSLConnection';
164
            if ($container->hasParameter('dtc_queue.rabbit_mq.ssl_options')) {
165
                $arguments[] = $container->getParameter('dtc_queue.rabbit_mq.ssl_options');
166
            } else {
167
                $arguments[] = [];
168
            }
169
            if ($container->hasParameter('dtc_queue.rabbit_mq.options')) {
170
                $arguments[] = $container->getParameter('dtc_queue.rabbit_mq.options');
171
            }
172
        } else {
173
            if ($container->hasParameter('dtc_queue.rabbit_mq.options')) {
174
                $options = $container->getParameter('dtc_queue.rabbit_mq.options');
175
                $this->setRabbitMqOptionsPt1($arguments, $options);
176
                $this->setRabbitMqOptionsPt2($arguments, $options);
177
            }
178
        }
179
    }
180
181
    protected function setRabbitMqOptionsPt1(array &$arguments, array $options)
182
    {
183
        if (isset($options['insist'])) {
184
            $arguments[] = $options['insist'];
185
        } else {
186
            $arguments[] = false;
187
        }
188
        if (isset($options['login_method'])) {
189
            $arguments[] = $options['login_method'];
190
        } else {
191
            $arguments[] = 'AMQPLAIN';
192
        }
193
        if (isset($options['login_response'])) {
194
            $arguments[] = $options['login_response'];
195
        } else {
196
            $arguments[] = null;
197
        }
198
        if (isset($options['locale'])) {
199
            $arguments[] = $options['locale'];
200
        } else {
201
            $arguments[] = 'en_US';
202
        }
203
    }
204
205
    protected function setRabbitMqOptionsPt2(array &$arguments, array $options)
206
    {
207
        if (isset($options['connection_timeout'])) {
208
            $arguments[] = $options['connection_timeout'];
209
        } else {
210
            $arguments[] = 3.0;
211
        }
212
        if (isset($options['read_write_timeout'])) {
213
            $arguments[] = $options['read_write_timeout'];
214
        } else {
215
            $arguments[] = 3.0;
216
        }
217
        if (isset($options['context'])) {
218
            $arguments[] = $options['context'];
219
        } else {
220
            $arguments[] = null;
221
        }
222
        if (isset($options['keepalive'])) {
223
            $arguments[] = $options['keepalive'];
224
        } else {
225
            $arguments[] = false;
226
        }
227
        if (isset($options['heartbeat'])) {
228
            $arguments[] = $options['heartbeat'];
229
        } else {
230
            $arguments[] = 0;
231
        }
232
    }
233
234
    /**
235
     * Determines the job class based on the queue manager type.
236
     *
237
     * @param ContainerBuilder $container
238
     *
239
     * @return mixed|string
240
     *
241
     * @throws \Exception
242
     */
243
    protected function getJobClass(ContainerBuilder $container)
244
    {
245
        $jobClass = $container->getParameter('dtc_queue.class_job');
246
        if (!$jobClass) {
247
            switch ($defaultType = $container->getParameter('dtc_queue.default_manager')) {
248
                case 'mongodb': // deprecated remove in 4.0
249
                case 'odm':
250
                    $jobClass = 'Dtc\\QueueBundle\\Document\\Job';
251
                    break;
252
                case 'beanstalkd':
253
                    $jobClass = 'Dtc\\QueueBundle\\Beanstalkd\\Job';
254
                    break;
255
                case 'rabbit_mq':
256
                    $jobClass = 'Dtc\\QueueBundle\\RabbitMQ\\Job';
257
                    break;
258
                case 'orm':
259
                    $jobClass = 'Dtc\\QueueBundle\\Entity\\Job';
260
                    break;
261
                default:
262
                    throw new \Exception('Unknown default_manager type '.$defaultType.' - please specify a Job class in the \'class\' configuration parameter');
263
            }
264
        }
265
266
        $this->testJobClass($jobClass);
267
268
        return $jobClass;
269
    }
270
271
    protected function getRunClass(ContainerBuilder $container, $type, $className)
272
    {
273
        $runArchiveClass = $container->hasParameter('dtc_queue.class_'.$type) ? $container->getParameter('dtc_queue.class_'.$type) : null;
274
        if (!$runArchiveClass) {
275
            switch ($container->getParameter('dtc_queue.default_manager')) {
276
                case 'mongodb': // deprecated remove in 4.0
277
                case 'odm':
278
                    $runArchiveClass = 'Dtc\\QueueBundle\\Document\\'.$className;
279
                    break;
280
                case 'orm':
281
                    $runArchiveClass = 'Dtc\\QueueBundle\\Entity\\'.$className;
282
                    break;
283
                default:
284
                    $runArchiveClass = 'Dtc\\QueueBundle\\Model\\Run';
285
            }
286
        }
287
288
        $this->testRunClass($runArchiveClass);
289
290
        return $runArchiveClass;
291
    }
292
293
    /**
294
     * @param string $runClass
295
     *
296
     * @throws \Exception
297
     */
298
    protected function testRunClass($runClass)
299
    {
300
        if (!class_exists($runClass)) {
301
            throw new \Exception("Can't find class $runClass");
302
        }
303
304
        $test = new $runClass();
305
        if (!$test instanceof Run) {
306
            throw new \Exception("$runClass must be instance of (or derived from) Dtc\\QueueBundle\\Model\\Run");
307
        }
308
    }
309
310
    /**
311
     *
312
     * @throws \Exception
313
     */
314
    protected function testJobClass($jobClass)
315
    {
316
        if ($jobClass) {
317
            if (!class_exists($jobClass)) {
318
                throw new \Exception("Can't find class $jobClass");
319
            }
320
321
            $test = new $jobClass();
322
            if (!$test instanceof Job) {
323
                throw new \Exception("$jobClass must be instance of (or derived from) Dtc\\QueueBundle\\Model\\Job");
324
            }
325
        }
326
    }
327
328
    /**
329
     * Determines the job class based on the queue manager type.
330
     *
331
     * @param ContainerBuilder $container
332
     *
333
     * @return mixed|string
334
     *
335
     * @throws \Exception
336
     */
337
    protected function getJobClassArchive(ContainerBuilder $container)
338
    {
339
        $jobArchiveClass = $container->getParameter('dtc_queue.class_job_archive');
340
        if (!$jobArchiveClass) {
341
            switch ($container->getParameter('dtc_queue.default_manager')) {
342
                case 'mongodb': // deprecated remove in 4.0
343
                case 'odm':
344
                    $jobArchiveClass = 'Dtc\\QueueBundle\\Document\\JobArchive';
345
                    break;
346
                case 'orm':
347
                    $jobArchiveClass = 'Dtc\\QueueBundle\\Entity\\JobArchive';
348
                    break;
349
            }
350
        }
351
        $this->testJobClass($jobArchiveClass);
352
353
        return $jobArchiveClass;
354
    }
355
}
356