Completed
Push — master ( 239355...d39289 )
by Matthew
05:26
created

WorkerCompilerPass::setupDoctrineManagers()   A

Complexity

Conditions 3
Paths 4

Size

Total Lines 16
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 12

Importance

Changes 0
Metric Value
dl 0
loc 16
ccs 0
cts 13
cp 0
rs 9.4285
c 0
b 0
f 0
cc 3
eloc 9
nc 4
nop 1
crap 12
1
<?php
2
3
namespace Dtc\QueueBundle\DependencyInjection\Compiler;
4
5
use Dtc\QueueBundle\Model\Job;
6
use Dtc\QueueBundle\Model\Run;
7
use Pheanstalk\Pheanstalk;
8
use Symfony\Component\DependencyInjection\Alias;
9
use Symfony\Component\DependencyInjection\ContainerBuilder;
10
use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
11
use Symfony\Component\DependencyInjection\Definition;
12
use Symfony\Component\DependencyInjection\Reference;
13
14
class WorkerCompilerPass implements CompilerPassInterface
15
{
16
    public function process(ContainerBuilder $container)
17
    {
18
        if (false === $container->hasDefinition('dtc_queue.worker_manager')) {
19
            return;
20
        }
21
22
        $defaultManagerType = $container->getParameter('dtc_queue.default_manager');
23
        if (!$container->hasDefinition('dtc_queue.job_manager.'.$defaultManagerType)) {
24
            throw new \Exception("No job manager found for dtc_queue.job_manager.$defaultManagerType");
25
        }
26
27
        $alias = new Alias('dtc_queue.job_manager.'.$defaultManagerType);
28
        $container->setAlias('dtc_queue.job_manager', $alias);
29
30
        // Setup beanstalkd if configuration is present
31
        $this->setupBeanstalkd($container);
32
        $this->setupRabbitMQ($container);
33
34
        $definition = $container->getDefinition('dtc_queue.worker_manager');
35
        $jobManagerRef = array(new Reference('dtc_queue.job_manager'));
36
37
        $jobClass = $this->getJobClass($container);
38
        $jobArchiveClass = $this->getJobClassArchive($container);
39
        $container->setParameter('dtc_queue.class_job', $jobClass);
40
        $container->setParameter('dtc_queue.class_job_archive', $jobArchiveClass);
41
        $container->setParameter('dtc_queue.class_run', $this->getRunArchiveClass($container, 'run', 'Run'));
42
        $container->setParameter('dtc_queue.class_run_archive', $this->getRunArchiveClass($container, 'run_archive', 'RunArchive'));
43
        // Add each worker to workerManager, make sure each worker has instance to work
44
        foreach ($container->findTaggedServiceIds('dtc_queue.worker') as $id => $attributes) {
45
            $worker = $container->getDefinition($id);
46
            $class = $container->getDefinition($id)->getClass();
47
48
            $refClass = new \ReflectionClass($class);
49
            $workerClass = 'Dtc\QueueBundle\Model\Worker';
50
            if (!$refClass->isSubclassOf($workerClass)) {
51
                throw new \InvalidArgumentException(sprintf('Service "%s" must extend class "%s".', $id, $workerClass));
52
            }
53
54
            // Give each worker access to job manager
55
            $worker->addMethodCall('setJobManager', $jobManagerRef);
56
            $worker->addMethodCall('setJobClass', array($jobClass));
57
58
            $definition->addMethodCall('addWorker', array(new Reference($id)));
59
        }
60
61
        $eventDispatcher = $container->getDefinition('dtc_queue.event_dispatcher');
62
        foreach ($container->findTaggedServiceIds('dtc_queue.event_subscriber') as $id => $attributes) {
63
            $eventSubscriber = $container->getDefinition($id);
64
            $eventDispatcher->addMethodCall('addSubscriber', [$eventSubscriber]);
65
        }
66
        $this->setupDoctrineManagers($container);
67
    }
68
69
    /**
70
     * Sets up beanstalkd instance if appropriate.
71
     *
72
     * @param ContainerBuilder $container
73
     */
74
    public function setupBeanstalkd(ContainerBuilder $container)
75
    {
76
        if ($container->hasParameter('dtc_queue.beanstalkd.host')) {
77
            $definition = new Definition('Pheanstalk\\Pheanstalk', [$container->getParameter('dtc_queue.beanstalkd.host')]);
78
            $container->setDefinition('dtc_queue.beanstalkd', $definition);
79
            $definition = $container->getDefinition('dtc_queue.job_manager.beanstalkd');
80
            $definition->addMethodCall('setBeanstalkd', [new Reference('dtc_queue.beanstalkd')]);
81
            if ($container->hasParameter('dtc_queue.beanstalkd.tube')) {
82
                $definition->addMethodCall('setTube', [$container->getParameter('dtc_queue.beanstalkd.tube')]);
83
            }
84
        }
85
    }
86
87
    public function setupDoctrineManagers(ContainerBuilder $container)
88
    {
89
        $documentManager = $container->getParameter('dtc_queue.document_manager');
90
91
        $odmManager = "doctrine_mongodb.odm.{$documentManager}_document_manager";
92
        if ($container->has($odmManager)) {
93
            $container->setAlias('dtc_queue.document_manager', $odmManager);
94
        }
95
96
        $entityManager = $container->getParameter('dtc_queue.entity_manager');
97
98
        $ormManager = "doctrine.orm.{$entityManager}_entity_manager";
99
        if ($container->has($ormManager)) {
100
            $container->setAlias('dtc_queue.entity_manager', $ormManager);
101
        }
102
    }
103
104
    /**
105
     * Sets up RabbitMQ instance if appropriate.
106
     *
107
     * @param ContainerBuilder $container
108
     */
109
    public function setupRabbitMQ(ContainerBuilder $container)
110
    {
111
        if ($container->hasParameter('dtc_queue.rabbit_mq')) {
112
            $class = 'PhpAmqpLib\\Connection\\AMQPStreamConnection';
113
            $rabbitMqConfig = $container->getParameter('dtc_queue.rabbit_mq');
114
            $arguments = [
115
                $rabbitMqConfig['host'],
116
                $rabbitMqConfig['port'],
117
                $rabbitMqConfig['user'],
118
                $rabbitMqConfig['password'],
119
                $rabbitMqConfig['vhost'],
120
            ];
121
122
            if ($container->hasParameter('dtc_queue.rabbit_mq.ssl') && $container->getParameter('dtc_queue.rabbit_mq.ssl')) {
123
                $class = 'PhpAmqpLib\\Connection\\AMQPSSLConnection';
124
                if ($container->hasParameter('dtc_queue.rabbit_mq.ssl_options')) {
125
                    $arguments[] = $container->getParameter('dtc_queue.rabbit_mq.ssl_options');
126
                } else {
127
                    $arguments[] = [];
128
                }
129
                if ($container->hasParameter('dtc_queue.rabbit_mq.options')) {
130
                    $arguments[] = $container->getParameter('dtc_queue.rabbit_mq.options');
131
                }
132
            } else {
133
                if ($container->hasParameter('dtc_queue.rabbit_mq.options')) {
134
                    $options = $container->getParameter('dtc_queue.rabbit_mq.options');
135
                    $this->setRabbitMqOptionsPt1($arguments, $options);
136
                    $this->setRabbitMqOptionsPt2($arguments, $options);
137
                }
138
            }
139
140
            $definition = new Definition($class, $arguments);
141
            $container->setDefinition('dtc_queue.rabbit_mq', $definition);
142
            $definition = $container->getDefinition('dtc_queue.job_manager.rabbit_mq');
143
            $definition->addMethodCall('setAMQPConnection', [new Reference('dtc_queue.rabbit_mq')]);
144
            $definition->addMethodCall('setQueueArgs', array_values($rabbitMqConfig['queue_args']));
145
            $definition->addMethodCall('setExchangeArgs', array_values($rabbitMqConfig['exchange_args']));
146
        }
147
    }
148
149
    public function setRabbitMqOptionsPt1(array &$arguments, array $options)
150
    {
151
        if (isset($options['insist'])) {
152
            $arguments[] = $options['insist'];
153
        } else {
154
            $arguments[] = false;
155
        }
156
        if (isset($options['login_method'])) {
157
            $arguments[] = $options['login_method'];
158
        } else {
159
            $arguments[] = 'AMQPLAIN';
160
        }
161
        if (isset($options['login_response'])) {
162
            $arguments[] = $options['login_response'];
163
        } else {
164
            $arguments[] = null;
165
        }
166
        if (isset($options['locale'])) {
167
            $arguments[] = $options['locale'];
168
        } else {
169
            $arguments[] = 'en_US';
170
        }
171
    }
172
173
    public function setRabbitMqOptionsPt2(array &$arguments, array $options)
174
    {
175
        if (isset($options['connection_timeout'])) {
176
            $arguments[] = $options['connection_timeout'];
177
        } else {
178
            $arguments[] = 3.0;
179
        }
180
        if (isset($options['read_write_timeout'])) {
181
            $arguments[] = $options['read_write_timeout'];
182
        } else {
183
            $arguments[] = 3.0;
184
        }
185
        if (isset($options['context'])) {
186
            $arguments[] = $options['context'];
187
        } else {
188
            $arguments[] = null;
189
        }
190
        if (isset($options['keepalive'])) {
191
            $arguments[] = $options['keepalive'];
192
        } else {
193
            $arguments[] = false;
194
        }
195
        if (isset($options['heartbeat'])) {
196
            $arguments[] = $options['heartbeat'];
197
        } else {
198
            $arguments[] = 0;
199
        }
200
    }
201
202
    /**
203
     * Determines the job class based on the queue manager type.
204
     *
205
     * @param ContainerBuilder $container
206
     *
207
     * @return mixed|string
208
     *
209
     * @throws \Exception
210
     */
211
    public function getJobClass(ContainerBuilder $container)
212
    {
213
        $jobClass = $container->getParameter('dtc_queue.class_job');
214
        if (!$jobClass) {
215
            switch ($defaultType = $container->getParameter('dtc_queue.default_manager')) {
216
                case 'mongodb':
217
                    $jobClass = 'Dtc\\QueueBundle\\Document\\Job';
218
                    break;
219
                case 'beanstalkd':
220
                    $jobClass = 'Dtc\\QueueBundle\\Beanstalkd\\Job';
221
                    break;
222
                case 'rabbit_mq':
223
                    $jobClass = 'Dtc\\QueueBundle\\RabbitMQ\\Job';
224
                    break;
225
                case 'orm':
226
                    $jobClass = 'Dtc\\QueueBundle\\Entity\\Job';
227
                    break;
228
                default:
229
                    throw new \Exception('Unknown default_manager type '.$defaultType.' - please specify a Job class in the \'class\' configuration parameter');
230
            }
231
        }
232
233
        $this->testJobClass($jobClass);
234
235
        return $jobClass;
236
    }
237
238
    public function getRunArchiveClass(ContainerBuilder $container, $type, $className)
239
    {
240
        $runArchiveClass = $container->hasParameter('dtc_queue.class_'.$type) ? $container->getParameter('dtc_queue.class_'.$type) : null;
241
        if (!$runArchiveClass) {
242
            switch ($container->getParameter('dtc_queue.default_manager')) {
243
                case 'mongodb':
244
                    $runArchiveClass = 'Dtc\\QueueBundle\\Document\\'.$className;
245
                    break;
246
                case 'orm':
247
                    $runArchiveClass = 'Dtc\\QueueBundle\\Entity\\'.$className;
248
                    break;
249
                default:
250
                    $runArchiveClass = 'Dtc\\QueueBundle\\Model\\Run';
251
            }
252
        }
253
254
        $this->testRunClass($runArchiveClass);
255
256
        return $runArchiveClass;
257
    }
258
259
    /**
260
     * @param string $runClass
261
     *
262
     * @throws \Exception
263
     */
264
    protected function testRunClass($runClass)
265
    {
266
        if (!class_exists($runClass)) {
267
            throw new \Exception("Can't find class $runClass");
268
        }
269
270
        $test = new $runClass();
271
        if (!$test instanceof Run) {
272
            throw new \Exception("$runClass must be instance of (or derived from) Dtc\\QueueBundle\\Model\\Run");
273
        }
274
    }
275
276
    /**
277
     * @param string|null $jobArchiveClass
0 ignored issues
show
Bug introduced by
There is no parameter named $jobArchiveClass. Was it maybe removed?

This check looks for PHPDoc comments describing methods or function parameters that do not exist on the corresponding method or function.

Consider the following example. The parameter $italy is not defined by the method finale(...).

/**
 * @param array $germany
 * @param array $island
 * @param array $italy
 */
function finale($germany, $island) {
    return "2:1";
}

The most likely cause is that the parameter was removed, but the annotation was not.

Loading history...
278
     *
279
     * @throws \Exception
280
     */
281
    protected function testJobClass($jobClass)
282
    {
283
        if ($jobClass) {
284
            if (!class_exists($jobClass)) {
285
                throw new \Exception("Can't find class $jobClass");
286
            }
287
288
            $test = new $jobClass();
289
            if (!$test instanceof Job) {
290
                throw new \Exception("$jobClass must be instance of (or derived from) Dtc\\QueueBundle\\Model\\Job");
291
            }
292
        }
293
    }
294
295
    /**
296
     * Determines the job class based on the queue manager type.
297
     *
298
     * @param ContainerBuilder $container
299
     *
300
     * @return mixed|string
301
     *
302
     * @throws \Exception
303
     */
304
    public function getJobClassArchive(ContainerBuilder $container)
305
    {
306
        $jobArchiveClass = $container->getParameter('dtc_queue.class_job_archive');
307
        if (!$jobArchiveClass) {
308
            switch ($container->getParameter('dtc_queue.default_manager')) {
309
                case 'mongodb':
310
                    $jobArchiveClass = 'Dtc\\QueueBundle\\Document\\JobArchive';
311
                    break;
312
                case 'orm':
313
                    $jobArchiveClass = 'Dtc\\QueueBundle\\Entity\\JobArchive';
314
                    break;
315
            }
316
        }
317
        $this->testJobClass($jobArchiveClass);
318
319
        return $jobArchiveClass;
320
    }
321
}
322