Test Setup Failed
Push — master ( 9363fc...a800a8 )
by Matthew
02:46
created

WorkerCompilerPass::getJobClassArchive()   C

Complexity

Conditions 8
Paths 16

Size

Total Lines 27
Code Lines 17

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 27
rs 5.3846
c 0
b 0
f 0
cc 8
eloc 17
nc 16
nop 1
1
<?php
2
3
namespace Dtc\QueueBundle\DependencyInjection\Compiler;
4
5
use Dtc\QueueBundle\Model\Job;
6
use Dtc\QueueBundle\Model\Run;
7
use Pheanstalk\Pheanstalk;
8
use Symfony\Component\DependencyInjection\Alias;
9
use Symfony\Component\DependencyInjection\ContainerBuilder;
10
use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
11
use Symfony\Component\DependencyInjection\Definition;
12
use Symfony\Component\DependencyInjection\Reference;
13
14
class WorkerCompilerPass implements CompilerPassInterface
15
{
16
    public function process(ContainerBuilder $container)
17
    {
18
        if (false === $container->hasDefinition('dtc_queue.worker_manager')) {
19
            return;
20
        }
21
22
        $defaultManagerType = $container->getParameter('dtc_queue.default_manager');
23
        if (!$container->hasDefinition('dtc_queue.job_manager.'.$defaultManagerType)) {
24
            throw new \Exception("No job manager found for dtc_queue.job_manager.$defaultManagerType");
25
        }
26
27
        $alias = new Alias('dtc_queue.job_manager.'.$defaultManagerType);
28
        $container->setAlias('dtc_queue.job_manager', $alias);
29
30
        // Setup beanstalkd if configuration is present
31
        $this->setupBeanstalkd($container);
32
        $this->setupRabbitMQ($container);
33
34
        $definition = $container->getDefinition('dtc_queue.worker_manager');
35
        $jobManagerRef = array(new Reference('dtc_queue.job_manager'));
36
37
        $jobClass = $this->getJobClass($container);
38
        $jobArchiveClass = $this->getJobClassArchive($container);
39
        $container->setParameter('dtc_queue.class_job', $jobClass);
40
        $container->setParameter('dtc_queue.class_job_archive', $jobArchiveClass);
41
        $container->setParameter('dtc_queue.class_run', $this->getRunArchiveClass($container, 'run', 'Run'));
42
        $container->setParameter('dtc_queue.class_run_archive', $this->getRunArchiveClass($container, 'run_archive', 'RunArchive'));
43
        // Add each worker to workerManager, make sure each worker has instance to work
44
        foreach ($container->findTaggedServiceIds('dtc_queue.worker') as $id => $attributes) {
45
            $worker = $container->getDefinition($id);
46
            $class = $container->getDefinition($id)->getClass();
47
48
            $refClass = new \ReflectionClass($class);
49
            $workerClass = 'Dtc\QueueBundle\Model\Worker';
50
            if (!$refClass->isSubclassOf($workerClass)) {
51
                throw new \InvalidArgumentException(sprintf('Service "%s" must extend class "%s".', $id, $workerClass));
52
            }
53
54
            // Give each worker access to job manager
55
            $worker->addMethodCall('setJobManager', $jobManagerRef);
56
            $worker->addMethodCall('setJobClass', array($jobClass));
57
58
            $definition->addMethodCall('addWorker', array(new Reference($id)));
59
        }
60
61
        $eventDispatcher = $container->getDefinition('dtc_queue.event_dispatcher');
62
        foreach ($container->findTaggedServiceIds('dtc_queue.event_subscriber') as $id => $attributes) {
63
            $eventSubscriber = $container->getDefinition($id);
64
            $eventDispatcher->addMethodCall('addSubscriber', [$eventSubscriber]);
65
        }
66
        $this->setupDoctrineManagers($container);
67
    }
68
69
    /**
70
     * Sets up beanstalkd instance if appropriate.
71
     *
72
     * @param ContainerBuilder $container
73
     */
74
    public function setupBeanstalkd(ContainerBuilder $container)
75
    {
76
        if ($container->hasParameter('dtc_queue.beanstalkd.host')) {
77
            $definition = new Definition('Pheanstalk\\Pheanstalk', [$container->getParameter('dtc_queue.beanstalkd.host')]);
78
            $container->setDefinition('dtc_queue.beanstalkd', $definition);
79
            $definition = $container->getDefinition('dtc_queue.job_manager.beanstalkd');
80
            $definition->addMethodCall('setBeanstalkd', [new Reference('dtc_queue.beanstalkd')]);
81
            if ($container->hasParameter('dtc_queue.beanstalkd.tube')) {
82
                $definition->addMethodCall('setTube', [$container->getParameter('dtc_queue.beanstalkd.tube')]);
83
            }
84
        }
85
    }
86
87
    public function setupDoctrineManagers(ContainerBuilder $container)
88
    {
89
        $documentManager = $container->getParameter('dtc_queue.document_manager');
90
91
        $odmManager = "doctrine_mongodb.odm.{$documentManager}_document_manager";
92
        if ($container->has($odmManager)) {
93
            $container->setAlias('dtc_queue.document_manager', $odmManager);
94
        }
95
96
        $entityManager = $container->getParameter('dtc_queue.entity_manager');
97
98
        $ormManager = "doctrine.orm.{$entityManager}_entity_manager";
99
        if ($container->has($ormManager)) {
100
            $container->setAlias('dtc_queue.entity_manager', $ormManager);
101
        }
102
    }
103
104
    /**
105
     * Sets up RabbitMQ instance if appropriate.
106
     *
107
     * @param ContainerBuilder $container
108
     */
109
    public function setupRabbitMQ(ContainerBuilder $container)
110
    {
111
        if ($container->hasParameter('dtc_queue.rabbit_mq')) {
112
            $class = 'PhpAmqpLib\\Connection\\AMQPStreamConnection';
113
            $rabbitMqConfig = $container->getParameter('dtc_queue.rabbit_mq');
114
            $arguments = [
115
                $rabbitMqConfig['host'],
116
                $rabbitMqConfig['port'],
117
                $rabbitMqConfig['user'],
118
                $rabbitMqConfig['password'],
119
                $rabbitMqConfig['vhost'],
120
            ];
121
122
            if ($container->hasParameter('dtc_queue.rabbit_mq.ssl') && $container->getParameter('dtc_queue.rabbit_mq.ssl')) {
123
                $class = 'PhpAmqpLib\\Connection\\AMQPSSLConnection';
124
                if ($container->hasParameter('dtc_queue.rabbit_mq.ssl_options')) {
125
                    $arguments[] = $container->getParameter('dtc_queue.rabbit_mq.ssl_options');
126
                } else {
127
                    $arguments[] = [];
128
                }
129
                if ($container->hasParameter('dtc_queue.rabbit_mq.options')) {
130
                    $arguments[] = $container->getParameter('dtc_queue.rabbit_mq.options');
131
                }
132
            } else {
133
                if ($container->hasParameter('dtc_queue.rabbit_mq.options')) {
134
                    $options = $container->getParameter('dtc_queue.rabbit_mq.options');
135
                    $this->setRabbitMqOptionsPt1($arguments, $options);
136
                    $this->setRabbitMqOptionsPt2($arguments, $options);
137
                }
138
            }
139
140
            $definition = new Definition($class, $arguments);
141
            $container->setDefinition('dtc_queue.rabbit_mq', $definition);
142
            $definition = $container->getDefinition('dtc_queue.job_manager.rabbit_mq');
143
            $definition->addMethodCall('setAMQPConnection', [new Reference('dtc_queue.rabbit_mq')]);
144
            $definition->addMethodCall('setQueueArgs', array_values($rabbitMqConfig['queue_args']));
145
            $definition->addMethodCall('setExchangeArgs', array_values($rabbitMqConfig['exchange_args']));
146
        }
147
    }
148
149
    public function setRabbitMqOptionsPt1(array &$arguments, array $options)
150
    {
151
        if (isset($options['insist'])) {
152
            $arguments[] = $options['insist'];
153
        } else {
154
            $arguments[] = false;
155
        }
156
        if (isset($options['login_method'])) {
157
            $arguments[] = $options['login_method'];
158
        } else {
159
            $arguments[] = 'AMQPLAIN';
160
        }
161
        if (isset($options['login_response'])) {
162
            $arguments[] = $options['login_response'];
163
        } else {
164
            $arguments[] = null;
165
        }
166
        if (isset($options['locale'])) {
167
            $arguments[] = $options['locale'];
168
        } else {
169
            $arguments[] = 'en_US';
170
        }
171
    }
172
173
    public function setRabbitMqOptionsPt2(array &$arguments, array $options)
174
    {
175
        if (isset($options['connection_timeout'])) {
176
            $arguments[] = $options['connection_timeout'];
177
        } else {
178
            $arguments[] = 3.0;
179
        }
180
        if (isset($options['read_write_timeout'])) {
181
            $arguments[] = $options['read_write_timeout'];
182
        } else {
183
            $arguments[] = 3.0;
184
        }
185
        if (isset($options['context'])) {
186
            $arguments[] = $options['context'];
187
        } else {
188
            $arguments[] = null;
189
        }
190
        if (isset($options['keepalive'])) {
191
            $arguments[] = $options['keepalive'];
192
        } else {
193
            $arguments[] = false;
194
        }
195
        if (isset($options['heartbeat'])) {
196
            $arguments[] = $options['heartbeat'];
197
        } else {
198
            $arguments[] = 0;
199
        }
200
    }
201
202
    /**
203
     * Determines the job class based on the queue manager type.
204
     *
205
     * @param ContainerBuilder $container
206
     *
207
     * @return mixed|string
208
     *
209
     * @throws \Exception
210
     */
211
    public function getJobClass(ContainerBuilder $container)
212
    {
213
        $jobClass = $container->getParameter('dtc_queue.class_job');
214
        if (!$jobClass) {
215
            switch ($defaultType = $container->getParameter('dtc_queue.default_manager')) {
216
                case 'mongodb':
217
                    $jobClass = 'Dtc\\QueueBundle\\Document\\Job';
218
                    break;
219
                case 'beanstalkd':
220
                    $jobClass = 'Dtc\\QueueBundle\\Beanstalkd\\Job';
221
                    break;
222
                case 'rabbit_mq':
223
                    $jobClass = 'Dtc\\QueueBundle\\RabbitMQ\\Job';
224
                    break;
225
                case 'orm':
226
                    $jobClass = 'Dtc\\QueueBundle\\Entity\\Job';
227
                    break;
228
                default:
229
                    throw new \Exception('Unknown default_manager type '.$defaultType.' - please specify a Job class in the \'class\' configuration parameter');
230
            }
231
        }
232
233
        if (!class_exists($jobClass)) {
234
            throw new \Exception("Can't find Job class $jobClass");
235
        }
236
237
        $test = new $jobClass();
238
        if (!$test instanceof Job) {
239
            throw new \Exception("$jobClass must be instance of (or derived from) Dtc\\QueueBundle\\Model\\Job");
240
        }
241
242
        return $jobClass;
243
    }
244
245
    public function getRunArchiveClass(ContainerBuilder $container, $type, $className)
246
    {
247
        $runArchiveClass = $container->hasParameter('dtc_queue.class_'.$type) ? $container->getParameter('dtc_queue.class_'.$type) : null;
248
        if (!$runArchiveClass) {
249
            switch ($container->getParameter('dtc_queue.default_manager')) {
250
                case 'mongodb':
251
                    $runArchiveClass = 'Dtc\\QueueBundle\\Document\\'.$className;
252
                    break;
253
                case 'orm':
254
                    $runArchiveClass = 'Dtc\\QueueBundle\\Entity\\'.$className;
255
                    break;
256
                default:
257
                    $runArchiveClass = 'Dtc\\QueueBundle\\Model\\Run';
258
            }
259
        }
260
261
        if (isset($runArchiveClass)) {
262
            if (!class_exists($runArchiveClass)) {
263
                throw new \Exception("Can't find $className class $runArchiveClass");
264
            }
265
        }
266
267
        $test = new $runArchiveClass();
268
        if (!$test instanceof Run) {
269
            throw new \Exception("$runArchiveClass must be instance of (or derived from) Dtc\\QueueBundle\\Model\\Run");
270
        }
271
272
        return $runArchiveClass;
273
    }
274
275
    /**
276
     * Determines the job class based on the queue manager type.
277
     *
278
     * @param ContainerBuilder $container
279
     *
280
     * @return mixed|string
281
     *
282
     * @throws \Exception
283
     */
284
    public function getJobClassArchive(ContainerBuilder $container)
285
    {
286
        $jobArchiveClass = $container->getParameter('dtc_queue.class_job_archive');
287
        if (!$jobArchiveClass) {
288
            switch ($container->getParameter('dtc_queue.default_manager')) {
289
                case 'mongodb':
290
                    $jobArchiveClass = 'Dtc\\QueueBundle\\Document\\JobArchive';
291
                    break;
292
                case 'orm':
293
                    $jobArchiveClass = 'Dtc\\QueueBundle\\Entity\\JobArchive';
294
                    break;
295
            }
296
        }
297
298
        if ($jobArchiveClass && !class_exists($jobArchiveClass)) {
299
            throw new \Exception("Can't find JobArchive class $jobArchiveClass");
300
        }
301
302
        if ($jobArchiveClass) {
303
            $test = new $jobArchiveClass();
304
            if (!$test instanceof Job) {
305
                throw new \Exception("$jobArchiveClass must be instance of (or derived from) Dtc\\QueueBundle\\Model\\Job");
306
            }
307
        }
308
309
        return $jobArchiveClass;
310
    }
311
}
312