Completed
Push — master ( 109933...83326a )
by Matthew
02:47
created

WorkerCompilerPass::setRabbitMqOptionsPt1()   B

Complexity

Conditions 5
Paths 16

Size

Total Lines 23
Code Lines 17

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 23
rs 8.5906
c 0
b 0
f 0
cc 5
eloc 17
nc 16
nop 2
1
<?php
2
3
namespace Dtc\QueueBundle\DependencyInjection\Compiler;
4
5
use Dtc\QueueBundle\Model\Job;
6
use Dtc\QueueBundle\Model\Run;
7
use Pheanstalk\Pheanstalk;
8
use Symfony\Component\DependencyInjection\Alias;
9
use Symfony\Component\DependencyInjection\ContainerBuilder;
10
use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
11
use Symfony\Component\DependencyInjection\Definition;
12
use Symfony\Component\DependencyInjection\Reference;
13
14
class WorkerCompilerPass implements CompilerPassInterface
15
{
16
    public function process(ContainerBuilder $container)
17
    {
18
        if (false === $container->hasDefinition('dtc_queue.worker_manager')) {
19
            return;
20
        }
21
22
        $defaultManagerType = $container->getParameter('dtc_queue.default_manager');
23
        if (!$container->hasDefinition('dtc_queue.job_manager.'.$defaultManagerType)) {
24
            throw new \Exception("No job manager found for dtc_queue.job_manager.$defaultManagerType");
25
        }
26
27
        $alias = new Alias('dtc_queue.job_manager.'.$defaultManagerType);
28
        $container->setAlias('dtc_queue.job_manager', $alias);
29
30
        // Setup beanstalkd if configuration is present
31
        $this->setupBeanstalkd($container);
32
        $this->setupRabbitMQ($container);
33
34
        $definition = $container->getDefinition('dtc_queue.worker_manager');
35
        $jobManagerRef = array(new Reference('dtc_queue.job_manager'));
36
37
        $jobClass = $this->getJobClass($container);
38
        $jobClassArchive = $this->getJobClassArchive($container);
39
        $container->setParameter('dtc_queue.class_job', $jobClass);
40
        $container->setParameter('dtc_queue.class_job_archive', $jobClassArchive);
41
        $container->setParameter('dtc_queue.class_run', $this->getRunClass($container));
42
        $container->setParameter('dtc_queue.class_run_archive', $this->getRunArchiveClass($container));
43
        // Add each worker to workerManager, make sure each worker has instance to work
44
        foreach ($container->findTaggedServiceIds('dtc_queue.worker') as $id => $attributes) {
45
            $worker = $container->getDefinition($id);
46
            $class = $container->getDefinition($id)->getClass();
47
48
            $refClass = new \ReflectionClass($class);
49
            $workerClass = 'Dtc\QueueBundle\Model\Worker';
50
            if (!$refClass->isSubclassOf($workerClass)) {
51
                throw new \InvalidArgumentException(sprintf('Service "%s" must extend class "%s".', $id, $workerClass));
52
            }
53
54
            // Give each worker access to job manager
55
            $worker->addMethodCall('setJobManager', $jobManagerRef);
56
            $worker->addMethodCall('setJobClass', array($jobClass));
57
58
            $definition->addMethodCall('addWorker', array(new Reference($id)));
59
        }
60
61
        $eventDispatcher = $container->getDefinition('dtc_queue.event_dispatcher');
62
        foreach ($container->findTaggedServiceIds('dtc_queue.event_subscriber') as $id => $attributes) {
63
            $eventSubscriber = $container->getDefinition($id);
64
            $eventDispatcher->addMethodCall('addSubscriber', [$eventSubscriber]);
65
        }
66
        $this->setupDoctrineManagers($container);
67
    }
68
69
    /**
70
     * Sets up beanstalkd instance if appropriate.
71
     *
72
     * @param ContainerBuilder $container
73
     */
74
    public function setupBeanstalkd(ContainerBuilder $container)
75
    {
76
        if ($container->hasParameter('dtc_queue.beanstalkd.host')) {
77
            $definition = new Definition('Pheanstalk\\Pheanstalk', [$container->getParameter('dtc_queue.beanstalkd.host')]);
78
            $container->setDefinition('dtc_queue.beanstalkd', $definition);
79
            $definition = $container->getDefinition('dtc_queue.job_manager.beanstalkd');
80
            $definition->addMethodCall('setBeanstalkd', [new Reference('dtc_queue.beanstalkd')]);
81
            if ($container->hasParameter('dtc_queue.beanstalkd.tube')) {
82
                $definition->addMethodCall('setTube', [$container->getParameter('dtc_queue.beanstalkd.tube')]);
83
            }
84
        }
85
    }
86
87
    public function setupDoctrineManagers(ContainerBuilder $container)
88
    {
89
        $documentManager = $container->getParameter('dtc_queue.document_manager');
90
91
        $odmManager = "doctrine_mongodb.odm.{$documentManager}_document_manager";
92
        if ($container->has($odmManager)) {
93
            $container->setAlias('dtc_queue.document_manager', $odmManager);
94
        }
95
96
        $entityManager = $container->getParameter('dtc_queue.entity_manager');
97
98
        $ormManager = "doctrine.orm.{$entityManager}_entity_manager";
99
        if ($container->has($ormManager)) {
100
            $container->setAlias('dtc_queue.entity_manager', $ormManager);
101
        }
102
    }
103
104
    /**
105
     * Sets up RabbitMQ instance if appropriate.
106
     *
107
     * @param ContainerBuilder $container
108
     */
109
    public function setupRabbitMQ(ContainerBuilder $container)
110
    {
111
        if ($container->hasParameter('dtc_queue.rabbit_mq')) {
112
            $class = 'PhpAmqpLib\\Connection\\AMQPStreamConnection';
113
            $rabbitMqConfig = $container->getParameter('dtc_queue.rabbit_mq');
114
            $arguments = [
115
                $rabbitMqConfig['host'],
116
                $rabbitMqConfig['port'],
117
                $rabbitMqConfig['user'],
118
                $rabbitMqConfig['password'],
119
                $rabbitMqConfig['vhost'],
120
            ];
121
122
            if ($container->hasParameter('dtc_queue.rabbit_mq.ssl') && $container->getParameter('dtc_queue.rabbit_mq.ssl')) {
123
                $class = 'PhpAmqpLib\\Connection\\AMQPSSLConnection';
124
                if ($container->hasParameter('dtc_queue.rabbit_mq.ssl_options')) {
125
                    $arguments[] = $container->getParameter('dtc_queue.rabbit_mq.ssl_options');
126
                } else {
127
                    $arguments[] = [];
128
                }
129
                if ($container->hasParameter('dtc_queue.rabbit_mq.options')) {
130
                    $arguments[] = $container->getParameter('dtc_queue.rabbit_mq.options');
131
                }
132
            } else {
133
                if ($container->hasParameter('dtc_queue.rabbit_mq.options')) {
134
                    $options = $container->getParameter('dtc_queue.rabbit_mq.options');
135
                    $this->setRabbitMqOptionsPt1($arguments, $options);
136
                    $this->setRabbitMqOptionsPt2($arguments, $options);
137
                }
138
            }
139
140
            $definition = new Definition($class, $arguments);
141
            $container->setDefinition('dtc_queue.rabbit_mq', $definition);
142
            $definition = $container->getDefinition('dtc_queue.job_manager.rabbit_mq');
143
            $definition->addMethodCall('setAMQPConnection', [new Reference('dtc_queue.rabbit_mq')]);
144
            $definition->addMethodCall('setQueueArgs', array_values($rabbitMqConfig['queue_args']));
145
            $definition->addMethodCall('setExchangeArgs', array_values($rabbitMqConfig['exchange_args']));
146
        }
147
    }
148
149
    public function setRabbitMqOptionsPt1(array &$arguments, array $options)
150
    {
151
        if (isset($options['insist'])) {
152
            $arguments[] = $options['insist'];
153
        } else {
154
            $arguments[] = false;
155
        }
156
        if (isset($options['login_method'])) {
157
            $arguments[] = $options['login_method'];
158
        } else {
159
            $arguments[] = 'AMQPLAIN';
160
        }
161
        if (isset($options['login_response'])) {
162
            $arguments[] = $options['login_response'];
163
        } else {
164
            $arguments[] = null;
165
        }
166
        if (isset($options['locale'])) {
167
            $arguments[] = $options['locale'];
168
        } else {
169
            $arguments[] = 'en_US';
170
        }
171
    }
172
173
    public function setRabbitMqOptionsPt2(array &$arguments, array $options)
174
    {
175
        if (isset($options['connection_timeout'])) {
176
            $arguments[] = $options['connection_timeout'];
177
        } else {
178
            $arguments[] = 3.0;
179
        }
180
        if (isset($options['read_write_timeout'])) {
181
            $arguments[] = $options['read_write_timeout'];
182
        } else {
183
            $arguments[] = 3.0;
184
        }
185
        if (isset($options['context'])) {
186
            $arguments[] = $options['context'];
187
        } else {
188
            $arguments[] = null;
189
        }
190
        if (isset($options['keepalive'])) {
191
            $arguments[] = $options['keepalive'];
192
        } else {
193
            $arguments[] = false;
194
        }
195
        if (isset($options['heartbeat'])) {
196
            $arguments[] = $options['heartbeat'];
197
        } else {
198
            $arguments[] = 0;
199
        }
200
    }
201
202
    /**
203
     * Determines the job class based on the queue manager type.
204
     *
205
     * @param ContainerBuilder $container
206
     *
207
     * @return mixed|string
208
     *
209
     * @throws \Exception
210
     */
211
    public function getJobClass(ContainerBuilder $container)
212
    {
213
        $jobClass = $container->getParameter('dtc_queue.class_job');
214
        if (!$jobClass) {
215
            switch ($defaultType = $container->getParameter('dtc_queue.default_manager')) {
216
                case 'mongodb':
217
                    $jobClass = 'Dtc\\QueueBundle\\Document\\Job';
218
                    break;
219
                case 'beanstalkd':
220
                    $jobClass = 'Dtc\\QueueBundle\\Beanstalkd\\Job';
221
                    break;
222
                case 'rabbit_mq':
223
                    $jobClass = 'Dtc\\QueueBundle\\RabbitMQ\\Job';
224
                    break;
225
                case 'orm':
226
                    $jobClass = 'Dtc\\QueueBundle\\Entity\\Job';
227
                    break;
228
                default:
229
                    throw new \Exception("Unknown default_manager type $defaultType - please specify a Job class in the 'class' configuration parameter");
230
            }
231
        }
232
233
        if (!class_exists($jobClass)) {
234
            throw new \Exception("Can't find Job class $jobClass");
235
        }
236
237
        $test = new $jobClass();
238
        if (!$test instanceof Job) {
239
            throw new \Exception("$jobClass must be instance of (or derived from) Dtc\\QueueBundle\\Model\\Job");
240
        }
241
242
        return $jobClass;
243
    }
244
245 View Code Duplication
    public function getRunClass(ContainerBuilder $container)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
246
    {
247
        $runClass = $container->hasParameter('dtc_queue.class_run') ? $container->getParameter('dtc_queue.class_run') : null;
248
        if (!$runClass) {
249
            switch ($defaultType = $container->getParameter('dtc_queue.default_manager')) {
0 ignored issues
show
Unused Code introduced by
$defaultType is not used, you could remove the assignment.

This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.

$myVar = 'Value';
$higher = false;

if (rand(1, 6) > 3) {
    $higher = true;
} else {
    $higher = false;
}

Both the $myVar assignment in line 1 and the $higher assignment in line 2 are dead. The first because $myVar is never used and the second because $higher is always overwritten for every possible time line.

Loading history...
250
                case 'mongodb':
251
                    $runClass = 'Dtc\\QueueBundle\\Document\\Run';
252
                    break;
253
                case 'orm':
254
                    $runClass = 'Dtc\\QueueBundle\\Entity\\Run';
255
                    break;
256
                default:
257
                    $runClass = 'Dtc\\QueueBundle\\Model\\Run';
258
            }
259
        }
260
261
        if (isset($runClass)) {
262
            if (!class_exists($runClass)) {
263
                throw new \Exception("Can't find Run class $runClass");
264
            }
265
        }
266
267
        $test = new $runClass();
268
        if (!$test instanceof Run) {
269
            throw new \Exception("$runClass must be instance of (or derived from) Dtc\\QueueBundle\\Model\\Run");
270
        }
271
272
        return $runClass;
273
    }
274
275 View Code Duplication
    public function getRunArchiveClass(ContainerBuilder $container)
0 ignored issues
show
Duplication introduced by
This method seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
276
    {
277
        $runArchiveClass = $container->hasParameter('dtc_queue.class_run_archive') ? $container->getParameter('dtc_queue.class_run_archive') : null;
278
        if (!$runArchiveClass) {
279
            switch ($defaultType = $container->getParameter('dtc_queue.default_manager')) {
0 ignored issues
show
Unused Code introduced by
$defaultType is not used, you could remove the assignment.

This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.

$myVar = 'Value';
$higher = false;

if (rand(1, 6) > 3) {
    $higher = true;
} else {
    $higher = false;
}

Both the $myVar assignment in line 1 and the $higher assignment in line 2 are dead. The first because $myVar is never used and the second because $higher is always overwritten for every possible time line.

Loading history...
280
                case 'mongodb':
281
                    $runArchiveClass = 'Dtc\\QueueBundle\\Document\\RunArchive';
282
                    break;
283
                case 'orm':
284
                    $runArchiveClass = 'Dtc\\QueueBundle\\Entity\\RunArchive';
285
                    break;
286
                default:
287
                    $runArchiveClass = 'Dtc\\QueueBundle\\Model\\Run';
288
            }
289
        }
290
291
        if (isset($runArchiveClass)) {
292
            if (!class_exists($runArchiveClass)) {
293
                throw new \Exception("Can't find RunArchive class $runArchiveClass");
294
            }
295
        }
296
297
        $test = new $runArchiveClass();
298
        if (!$test instanceof Run) {
299
            throw new \Exception("$runArchiveClass must be instance of (or derived from) Dtc\\QueueBundle\\Model\\Run");
300
        }
301
302
        return $runArchiveClass;
303
    }
304
305
    /**
306
     * Determines the job class based on the queue manager type.
307
     *
308
     * @param ContainerBuilder $container
309
     *
310
     * @return mixed|string
311
     *
312
     * @throws \Exception
313
     */
314
    public function getJobClassArchive(ContainerBuilder $container)
315
    {
316
        $jobArchiveClass = $container->getParameter('dtc_queue.class_job_archive');
317
        if (!$jobArchiveClass) {
318
            switch ($defaultType = $container->getParameter('dtc_queue.default_manager')) {
0 ignored issues
show
Unused Code introduced by
$defaultType is not used, you could remove the assignment.

This check looks for variable assignements that are either overwritten by other assignments or where the variable is not used subsequently.

$myVar = 'Value';
$higher = false;

if (rand(1, 6) > 3) {
    $higher = true;
} else {
    $higher = false;
}

Both the $myVar assignment in line 1 and the $higher assignment in line 2 are dead. The first because $myVar is never used and the second because $higher is always overwritten for every possible time line.

Loading history...
319
                case 'mongodb':
320
                    $jobArchiveClass = 'Dtc\\QueueBundle\\Document\\JobArchive';
321
                    break;
322
                case 'orm':
323
                    $jobArchiveClass = 'Dtc\\QueueBundle\\Entity\\JobArchive';
324
                    break;
325
            }
326
        }
327
328
        if ($jobArchiveClass && !class_exists($jobArchiveClass)) {
329
            throw new \Exception("Can't find JobArchive class $jobArchiveClass");
330
        }
331
332
        if ($jobArchiveClass) {
333
            $test = new $jobArchiveClass();
334
            if (!$test instanceof Job) {
335
                throw new \Exception("$jobArchiveClass must be instance of (or derived from) Dtc\\QueueBundle\\Model\\Job");
336
            }
337
        }
338
339
        return $jobArchiveClass;
340
    }
341
}
342