1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace TreeHouse\QueueBundle\DependencyInjection; |
4
|
|
|
|
5
|
|
|
use Symfony\Component\Config\Definition\Exception\InvalidConfigurationException; |
6
|
|
|
use Symfony\Component\Config\FileLocator; |
7
|
|
|
use Symfony\Component\DependencyInjection\ContainerBuilder; |
8
|
|
|
use Symfony\Component\DependencyInjection\ContainerInterface; |
9
|
|
|
use Symfony\Component\DependencyInjection\Definition; |
10
|
|
|
use Symfony\Component\DependencyInjection\DefinitionDecorator; |
11
|
|
|
use Symfony\Component\DependencyInjection\Loader\YamlFileLoader; |
12
|
|
|
use Symfony\Component\DependencyInjection\Reference; |
13
|
|
|
use Symfony\Component\HttpKernel\DependencyInjection\Extension; |
14
|
|
|
use TreeHouse\Queue\Amqp\ExchangeInterface; |
15
|
|
|
use TreeHouse\Queue\Amqp\QueueInterface; |
16
|
|
|
use TreeHouse\Queue\Processor\Retry\BackoffStrategy; |
17
|
|
|
use TreeHouse\Queue\Processor\Retry\DeprioritizeStrategy; |
18
|
|
|
use TreeHouse\Queue\Processor\Retry\RetryProcessor; |
19
|
|
|
|
20
|
|
|
class TreeHouseQueueExtension extends Extension |
21
|
|
|
{ |
22
|
|
|
/** |
23
|
|
|
* @var string[] |
24
|
|
|
*/ |
25
|
|
|
private $connections = []; |
26
|
|
|
|
27
|
|
|
/** |
28
|
|
|
* @var string[] |
29
|
|
|
*/ |
30
|
|
|
private $exchanges = []; |
31
|
|
|
|
32
|
|
|
/** |
33
|
|
|
* @var string[] |
34
|
|
|
*/ |
35
|
|
|
private $queues = []; |
36
|
|
|
|
37
|
|
|
/** |
38
|
|
|
* @var string[] |
39
|
|
|
*/ |
40
|
|
|
private $publishers = []; |
41
|
|
|
|
42
|
|
|
/** |
43
|
|
|
* @var string[] |
44
|
|
|
*/ |
45
|
|
|
private $consumers = []; |
46
|
|
|
|
47
|
|
|
/** |
48
|
|
|
* @var array Map that links exchanges with a DLX counterpart |
49
|
|
|
*/ |
50
|
|
|
private $dlxs = []; |
51
|
|
|
|
52
|
|
|
/** |
53
|
|
|
* @inheritdoc |
54
|
|
|
*/ |
55
|
29 |
|
public function load(array $configs, ContainerBuilder $container) |
56
|
|
|
{ |
57
|
29 |
|
$configuration = new Configuration(); |
58
|
29 |
|
$config = $this->processConfiguration($configuration, $configs); |
59
|
|
|
|
60
|
28 |
|
$loader = new YamlFileLoader($container, new FileLocator(__DIR__ . '/../Resources/config')); |
61
|
28 |
|
$loader->load('services.yml'); |
62
|
|
|
|
63
|
28 |
|
$this->loadDriver($config, $container); |
64
|
28 |
|
$this->loadConnections($config, $container); |
65
|
27 |
|
$this->loadPublishers($config, $container); |
66
|
27 |
|
$this->loadConsumers($config, $container); |
67
|
27 |
|
$this->loadExchanges($config, $container); |
68
|
27 |
|
$this->loadQueues($config, $container); |
69
|
|
|
|
70
|
27 |
|
$this->setCreatedDefinitionsParameters($container); |
71
|
|
|
|
72
|
27 |
|
if (!$config['auto_flush']) { |
73
|
1 |
|
$container->removeDefinition('tree_house.queue.event_listener.queue'); |
74
|
|
|
} |
75
|
27 |
|
} |
76
|
|
|
|
77
|
|
|
/** |
78
|
|
|
* @param array $config |
79
|
|
|
* @param ContainerBuilder $container |
80
|
|
|
*/ |
81
|
28 |
|
private function loadDriver(array $config, ContainerBuilder $container) |
82
|
|
|
{ |
83
|
28 |
|
$container->setParameter('tree_house.queue.driver', $config['driver']); |
84
|
28 |
|
$container->setAlias( |
85
|
28 |
|
'tree_house.amqp.factory', |
86
|
28 |
|
sprintf('tree_house.queue.driver.%s.factory', $config['driver']) |
87
|
|
|
); |
88
|
|
|
|
89
|
28 |
|
$classes = ['connection', 'channel', 'exchange', 'queue', 'publisher', 'factory']; |
90
|
28 |
|
foreach ($classes as $class) { |
91
|
28 |
|
$name = sprintf('tree_house.queue.%s.class', $class); |
92
|
28 |
|
if (!$container->hasParameter($name)) { |
93
|
28 |
|
$value = sprintf('tree_house.queue.driver.%s.%s.class', $config['driver'], $class); |
94
|
28 |
|
$container->setParameter($name, $container->getParameter($value)); |
95
|
|
|
} |
96
|
|
|
} |
97
|
28 |
|
} |
98
|
|
|
|
99
|
|
|
/** |
100
|
|
|
* @param array $config |
101
|
|
|
* @param ContainerBuilder $container |
102
|
|
|
* |
103
|
|
|
* @throws \LogicException |
104
|
|
|
*/ |
105
|
28 |
|
private function loadConnections(array $config, ContainerBuilder $container) |
106
|
|
|
{ |
107
|
28 |
|
foreach ($config['connections'] as $name => $connection) { |
108
|
28 |
|
$this->createConnectionDefinition($name, $connection, $container); |
109
|
|
|
} |
110
|
|
|
|
111
|
|
|
// default the first connection if it wasn't explicitly set |
112
|
28 |
|
if (!$config['default_connection']) { |
113
|
26 |
|
reset($config['connections']); |
114
|
26 |
|
$config['default_connection'] = key($config['connections']); |
115
|
|
|
} |
116
|
|
|
|
117
|
|
|
// set a parameter and alias for the default connection |
118
|
28 |
|
$connectionId = sprintf('tree_house.queue.connection.%s', $config['default_connection']); |
119
|
28 |
|
if (!$container->hasDefinition($connectionId)) { |
120
|
1 |
|
throw new \LogicException(sprintf('Connection "%s" does not exist', $connectionId)); |
121
|
|
|
} |
122
|
|
|
|
123
|
27 |
|
$container->setParameter('tree_house.queue.default_connection', $config['default_connection']); |
124
|
27 |
|
$container->setAlias('tree_house.queue.default_connection', $connectionId); |
125
|
27 |
|
} |
126
|
|
|
|
127
|
|
|
/** |
128
|
|
|
* @param array $config |
129
|
|
|
* @param ContainerBuilder $container |
130
|
|
|
*/ |
131
|
27 |
|
private function loadPublishers(array $config, ContainerBuilder $container) |
132
|
|
|
{ |
133
|
27 |
|
foreach ($config['publishers'] as $name => $publisher) { |
134
|
14 |
|
$this->createPublisherDefinition($name, $publisher, $container); |
135
|
|
|
} |
136
|
27 |
|
} |
137
|
|
|
|
138
|
|
|
/** |
139
|
|
|
* @param array $config |
140
|
|
|
* @param ContainerBuilder $container |
141
|
|
|
*/ |
142
|
27 |
|
private function loadConsumers(array $config, ContainerBuilder $container) |
143
|
|
|
{ |
144
|
27 |
|
foreach ($config['consumers'] as $name => $consumer) { |
145
|
9 |
|
$this->createConsumerDefinition($name, $consumer, $container); |
146
|
|
|
} |
147
|
27 |
|
} |
148
|
|
|
|
149
|
|
|
/** |
150
|
|
|
* @param array $config |
151
|
|
|
* @param ContainerBuilder $container |
152
|
|
|
*/ |
153
|
27 |
|
private function loadExchanges(array $config, ContainerBuilder $container) |
154
|
|
|
{ |
155
|
27 |
|
foreach ($config['exchanges'] as $name => $exchange) { |
156
|
2 |
|
$this->createExchangeDefinition($name, $exchange, $container); |
157
|
|
|
} |
158
|
27 |
|
} |
159
|
|
|
|
160
|
|
|
/** |
161
|
|
|
* @param array $config |
162
|
|
|
* @param ContainerBuilder $container |
163
|
|
|
*/ |
164
|
27 |
|
private function loadQueues(array $config, ContainerBuilder $container) |
165
|
|
|
{ |
166
|
27 |
|
foreach ($config['queues'] as $name => $queue) { |
167
|
3 |
|
$this->createQueueDefinition($name, $queue, $container); |
168
|
|
|
} |
169
|
27 |
|
} |
170
|
|
|
|
171
|
|
|
/** |
172
|
|
|
* @param string $name |
173
|
|
|
* @param array $config |
174
|
|
|
* @param ContainerBuilder $container |
175
|
|
|
* |
176
|
|
|
* @return string |
177
|
|
|
*/ |
178
|
28 |
|
private function createConnectionDefinition($name, array $config, ContainerBuilder $container) |
179
|
|
|
{ |
180
|
28 |
|
$amqpFactory = new Reference('tree_house.amqp.factory'); |
181
|
|
|
|
182
|
28 |
|
$definition = new Definition($container->getParameter('tree_house.queue.connection.class')); |
183
|
28 |
|
$definition->setFactory([$amqpFactory, 'createConnection']); |
184
|
28 |
|
$definition->addArgument($config['host']); |
185
|
28 |
|
$definition->addArgument((integer) $config['port']); |
186
|
28 |
|
$definition->addArgument($config['user']); |
187
|
28 |
|
$definition->addArgument($config['pass']); |
188
|
28 |
|
$definition->addArgument($config['vhost']); |
189
|
28 |
|
$definition->addArgument($config['params']); |
190
|
|
|
|
191
|
28 |
|
$connectionId = sprintf('tree_house.queue.connection.%s', $name); |
192
|
28 |
|
$container->setDefinition($connectionId, $definition); |
193
|
|
|
|
194
|
|
|
// create channel |
195
|
28 |
|
$definition = new Definition($container->getParameter('tree_house.queue.channel.class')); |
196
|
28 |
|
$definition->setFactory([$amqpFactory, 'createChannel']); |
197
|
28 |
|
$definition->addArgument(new Reference($connectionId)); |
198
|
|
|
|
199
|
|
|
// TODO set qos and prefetch stuff |
200
|
|
|
|
201
|
28 |
|
$channelId = sprintf('tree_house.queue.channel.%s', $name); |
202
|
28 |
|
$container->setDefinition($channelId, $definition); |
203
|
|
|
|
204
|
28 |
|
$this->connections[$name] = $connectionId; |
205
|
|
|
|
206
|
28 |
|
return $connectionId; |
207
|
|
|
} |
208
|
|
|
|
209
|
|
|
/** |
210
|
|
|
* @param string $name |
211
|
|
|
* @param array $config |
212
|
|
|
* @param ContainerBuilder $container |
213
|
|
|
* |
214
|
|
|
* @return string |
215
|
|
|
*/ |
216
|
16 |
|
private function createExchangeDefinition($name, array $config, ContainerBuilder $container) |
217
|
|
|
{ |
218
|
16 |
|
$amqpFactory = new Reference('tree_house.amqp.factory'); |
219
|
|
|
|
220
|
16 |
|
$connection = $config['connection'] ?: $container->getParameter('tree_house.queue.default_connection'); |
221
|
16 |
|
$channelId = sprintf('tree_house.queue.channel.%s', $connection); |
222
|
16 |
|
$channelAlias = sprintf('tree_house.queue.channel.%s', $name); |
223
|
|
|
|
224
|
|
|
// add alias if connection is named differently than exchange |
225
|
16 |
|
if ($name !== $connection) { |
226
|
16 |
|
$container->setAlias($channelAlias, $channelId); |
227
|
|
|
} |
228
|
|
|
|
229
|
16 |
|
$exchangeName = $config['name'] ?: $name; |
230
|
16 |
|
$exchangeType = $config['type']; |
231
|
16 |
|
$exchangeFlags = $this->getExchangeFlagsValue($config); |
232
|
16 |
|
$exchangeArguments = $config['arguments']; |
233
|
16 |
|
$autoDeclare = isset($config['auto_declare']) ? $config['auto_declare'] : true; |
234
|
|
|
|
235
|
|
|
// optionally create a delayed message exchange counterpart |
236
|
16 |
|
if (isset($config['delay']) && $config['delay']) { |
237
|
14 |
|
$exchangeArguments['x-delayed-type'] = $exchangeType; |
238
|
14 |
|
$exchangeType = ExchangeInterface::TYPE_DELAYED; |
239
|
|
|
} |
240
|
|
|
|
241
|
|
|
// create exchange |
242
|
16 |
|
$definition = new Definition($container->getParameter('tree_house.queue.exchange.class')); |
243
|
16 |
|
$definition->setFactory([$amqpFactory, 'createExchange']); |
244
|
16 |
|
$definition->addArgument(new Reference($channelAlias)); |
245
|
16 |
|
$definition->addArgument($exchangeName); |
246
|
16 |
|
$definition->addArgument($exchangeType); |
247
|
16 |
|
$definition->addArgument($exchangeFlags); |
248
|
16 |
|
$definition->addArgument($exchangeArguments); |
249
|
|
|
|
250
|
16 |
|
if ($autoDeclare) { |
251
|
15 |
|
$definition->addMethodCall('declareExchange'); |
252
|
|
|
} |
253
|
|
|
|
254
|
16 |
|
$exchangeId = sprintf('tree_house.queue.exchange.%s', $name); |
255
|
16 |
|
$container->setDefinition($exchangeId, $definition); |
256
|
|
|
|
257
|
16 |
|
$this->exchanges[$name] = [ |
258
|
16 |
|
'id' => $exchangeId, |
259
|
16 |
|
'auto_declare' => $autoDeclare, |
260
|
|
|
]; |
261
|
|
|
|
262
|
|
|
// optionally create a dead letter exchange counterpart |
263
|
16 |
|
if (isset($config['dlx']['enabled']) && $config['dlx']['enabled']) { |
264
|
16 |
|
if (!isset($config['dlx']['name'])) { |
265
|
14 |
|
$config['dlx']['name'] = sprintf('%s.dead', $exchangeName); |
266
|
|
|
} |
267
|
|
|
|
268
|
16 |
|
if (!isset($config['dlx']['connection'])) { |
269
|
16 |
|
$config['dlx']['connection'] = $connection; |
270
|
|
|
} |
271
|
|
|
|
272
|
16 |
|
if (!isset($config['dlx']['auto_declare'])) { |
273
|
16 |
|
$config['dlx']['auto_declare'] = $autoDeclare; |
274
|
|
|
} |
275
|
|
|
|
276
|
16 |
|
$dlxName = $config['dlx']['name']; |
277
|
16 |
|
$dlxId = $this->createExchangeDefinition($dlxName, $config['dlx'], $container); |
278
|
|
|
|
279
|
16 |
|
$this->dlxs[$name] = $dlxId; |
280
|
|
|
|
281
|
|
|
// create queue to route this DLX to |
282
|
16 |
|
$queue = $config['dlx']['queue']; |
283
|
16 |
|
if (!isset($queue['name'])) { |
284
|
16 |
|
$queue['name'] = $dlxName; |
285
|
|
|
} |
286
|
|
|
|
287
|
16 |
|
if (!isset($queue['connection'])) { |
288
|
16 |
|
$queue['connection'] = $connection; |
289
|
|
|
} |
290
|
|
|
|
291
|
16 |
|
$hasBinding = false; |
292
|
16 |
|
foreach ($queue['bindings'] as $binding) { |
293
|
|
|
if ($binding['exchange'] === $dlxName) { |
294
|
|
|
$hasBinding = true; |
295
|
|
|
break; |
296
|
|
|
} |
297
|
|
|
} |
298
|
16 |
|
if (!$hasBinding) { |
299
|
16 |
|
$queue['bindings'][] = [ |
300
|
16 |
|
'exchange' => $dlxName, |
301
|
|
|
'arguments' => [], |
302
|
|
|
]; |
303
|
|
|
} |
304
|
|
|
|
305
|
16 |
|
$this->createQueueDefinition($dlxName, $queue, $container); |
306
|
|
|
} |
307
|
|
|
|
308
|
16 |
|
return $exchangeId; |
309
|
|
|
} |
310
|
|
|
|
311
|
|
|
/** |
312
|
|
|
* @param ContainerBuilder $container |
313
|
|
|
* @param array $config |
314
|
|
|
* @param string $name |
315
|
|
|
* |
316
|
|
|
* @return string |
317
|
|
|
*/ |
318
|
9 |
|
private function createConsumerDefinition($name, array $config, ContainerBuilder $container) |
319
|
|
|
{ |
320
|
|
|
// create the queue |
321
|
9 |
|
$queue = $config['queue']; |
322
|
|
|
|
323
|
9 |
|
if (!isset($queue['name'])) { |
324
|
8 |
|
$queue['name'] = $name; |
325
|
|
|
} |
326
|
|
|
|
327
|
9 |
|
$queueId = $this->createQueueDefinition($name, $queue, $container); |
328
|
|
|
|
329
|
|
|
// create the processor |
330
|
9 |
|
$processorId = $this->createProcessorDefinition($name, $config, $container); |
331
|
|
|
|
332
|
|
|
// create the consumer |
333
|
9 |
|
$definition = new DefinitionDecorator('tree_house.queue.consumer.prototype'); |
334
|
9 |
|
$definition->addArgument(new Reference($queueId)); |
335
|
9 |
|
$definition->addArgument(new Reference($processorId)); |
336
|
9 |
|
$definition->addArgument(new Reference('event_dispatcher')); |
337
|
|
|
|
338
|
9 |
|
$consumerId = sprintf('tree_house.queue.consumer.%s', $name); |
339
|
9 |
|
$container->setDefinition($consumerId, $definition); |
340
|
|
|
|
341
|
9 |
|
$this->consumers[$name] = $consumerId; |
342
|
|
|
|
343
|
9 |
|
return $consumerId; |
344
|
|
|
} |
345
|
|
|
|
346
|
|
|
/** |
347
|
|
|
* @param string $name |
348
|
|
|
* @param array $config |
349
|
|
|
* @param ContainerBuilder $container |
350
|
|
|
* |
351
|
|
|
* @return string |
352
|
|
|
*/ |
353
|
22 |
|
private function createQueueDefinition($name, array $config, ContainerBuilder $container) |
354
|
|
|
{ |
355
|
22 |
|
$amqpFactory = new Reference('tree_house.amqp.factory'); |
356
|
|
|
|
357
|
22 |
|
$connection = $config['connection'] ?: $container->getParameter('tree_house.queue.default_connection'); |
358
|
22 |
|
$queueName = $config['name'] ?: $name; |
359
|
22 |
|
$channelId = sprintf('tree_house.queue.channel.%s', $connection); |
360
|
22 |
|
$arguments = $config['arguments']; |
361
|
22 |
|
$autoDeclare = isset($config['auto_declare']) ? $config['auto_declare'] : true; |
362
|
|
|
|
363
|
|
|
// if there is an exchange with the same name, and it has a DLX configured, set this in the arguments |
364
|
22 |
|
if (!array_key_exists('x-dead-letter-exchange', $arguments) && $dlx = $this->getDeadLetterExchange($name, $config, $container)) { |
365
|
6 |
|
$arguments['x-dead-letter-exchange'] = $dlx; |
366
|
|
|
} |
367
|
|
|
|
368
|
|
|
// create queue |
369
|
22 |
|
$definition = new Definition($container->getParameter('tree_house.queue.queue.class')); |
370
|
22 |
|
$definition->setFactory([$amqpFactory, 'createQueue']); |
371
|
22 |
|
$definition->addArgument(new Reference($channelId)); |
372
|
22 |
|
$definition->addArgument($queueName); |
373
|
22 |
|
$definition->addArgument($this->getQueueFlagsValue($config)); |
374
|
22 |
|
$definition->addArgument($arguments); |
375
|
|
|
|
376
|
22 |
|
if ($autoDeclare) { |
377
|
21 |
|
$definition->addMethodCall('declareQueue'); |
378
|
|
|
} |
379
|
|
|
|
380
|
22 |
|
if (empty($config['bindings'])) { |
381
|
|
|
// bind to the same exchange |
382
|
10 |
|
$config['bindings'][] = [ |
383
|
10 |
|
'exchange' => $name, |
384
|
|
|
'routing_keys' => [], |
385
|
|
|
'arguments' => [], |
386
|
|
|
]; |
387
|
|
|
} |
388
|
|
|
|
389
|
22 |
|
foreach ($config['bindings'] as $binding) { |
390
|
|
|
// if nothing is set, bind without routing key |
391
|
22 |
|
if (empty($binding['routing_keys'])) { |
392
|
21 |
|
$binding['routing_keys'] = [null]; |
393
|
|
|
} |
394
|
|
|
|
395
|
22 |
|
foreach ($binding['routing_keys'] as $routingKey) { |
396
|
22 |
|
$definition->addMethodCall('bind', [$binding['exchange'], $routingKey, $binding['arguments']]); |
397
|
|
|
} |
398
|
|
|
} |
399
|
|
|
|
400
|
22 |
|
$queueId = sprintf('tree_house.queue.queue.%s', $name); |
401
|
22 |
|
$container->setDefinition($queueId, $definition); |
402
|
|
|
|
403
|
22 |
|
$this->queues[$name] = [ |
404
|
22 |
|
'id' => $queueId, |
405
|
22 |
|
'auto_declare' => $autoDeclare, |
406
|
|
|
]; |
407
|
|
|
|
408
|
22 |
|
return $queueId; |
409
|
|
|
} |
410
|
|
|
|
411
|
|
|
/** |
412
|
|
|
* @param string $name |
413
|
|
|
* @param array $config |
414
|
|
|
* @param ContainerBuilder $container |
415
|
|
|
* |
416
|
|
|
* @return string |
417
|
|
|
*/ |
418
|
14 |
|
private function createPublisherDefinition($name, array $config, ContainerBuilder $container) |
419
|
|
|
{ |
420
|
|
|
// get the right channel for the exchange |
421
|
14 |
|
$exchange = $config['exchange']; |
422
|
14 |
|
$exchangeId = $this->createExchangeDefinition($name, $exchange, $container); |
423
|
|
|
|
424
|
|
|
// create message composer |
425
|
14 |
|
$composerId = $this->createMessageComposerDefinition($name, $config, $container); |
426
|
|
|
|
427
|
|
|
// create publisher |
428
|
14 |
|
$publisherId = sprintf('tree_house.queue.publisher.%s', $name); |
429
|
14 |
|
$publisher = new Definition($container->getParameter('tree_house.queue.publisher.class')); |
430
|
14 |
|
$publisher->setLazy(true); |
431
|
14 |
|
$publisher->addArgument(new Reference($exchangeId)); |
432
|
14 |
|
$publisher->addArgument(new Reference($composerId)); |
433
|
|
|
|
434
|
14 |
|
$container->setDefinition($publisherId, $publisher); |
435
|
|
|
|
436
|
14 |
|
$this->publishers[$name] = $publisherId; |
437
|
|
|
|
438
|
14 |
|
return $publisherId; |
439
|
|
|
} |
440
|
|
|
|
441
|
|
|
/** |
442
|
|
|
* @param string $name |
443
|
|
|
* @param array $config |
444
|
|
|
* @param ContainerBuilder $container |
445
|
|
|
* |
446
|
|
|
* @return string |
447
|
|
|
*/ |
448
|
14 |
|
private function createMessageComposerDefinition($name, array $config, ContainerBuilder $container) |
449
|
|
|
{ |
450
|
14 |
|
$composerId = sprintf('tree_house.queue.composer.%s', $name); |
451
|
14 |
|
$composer = $config['composer']; |
452
|
|
|
|
453
|
|
|
// resolve service |
454
|
14 |
|
if (substr($composer, 0, 1) === '@') { |
455
|
1 |
|
$container->setAlias($composerId, ltrim($composer, '@')); |
456
|
|
|
} else { |
457
|
|
|
// resolve parameter |
458
|
13 |
|
if (substr($composer, 0, 1) === '%') { |
459
|
13 |
|
$composer = $container->getParameter(substr($composer, 1, -1)); |
460
|
|
|
} |
461
|
|
|
|
462
|
|
|
// create serializer first |
463
|
13 |
|
$serializerId = $this->createMessageSerializerDefinition($name, $config['serializer'], $container); |
464
|
|
|
|
465
|
13 |
|
$composerDef = new Definition($composer); |
466
|
13 |
|
$composerDef->addArgument(new Reference($serializerId)); |
467
|
13 |
|
$container->setDefinition($composerId, $composerDef); |
468
|
|
|
} |
469
|
|
|
|
470
|
14 |
|
return $composerId; |
471
|
|
|
} |
472
|
|
|
|
473
|
|
|
/** |
474
|
|
|
* @param string $name |
475
|
|
|
* @param string $serializerClass |
476
|
|
|
* @param ContainerBuilder $container |
477
|
|
|
* |
478
|
|
|
* @return string |
479
|
|
|
*/ |
480
|
13 |
|
private function createMessageSerializerDefinition($name, $serializerClass, ContainerBuilder $container) |
481
|
|
|
{ |
482
|
13 |
|
$serializerId = sprintf('tree_house.queue.serializer.%s', $name); |
483
|
|
|
|
484
|
|
|
// resolve service |
485
|
13 |
|
if (substr($serializerClass, 0, 1) === '@') { |
486
|
12 |
|
$container->setAlias($serializerId, ltrim($serializerClass, '@')); |
487
|
|
|
|
488
|
12 |
|
return $serializerId; |
489
|
|
|
} else { |
490
|
1 |
|
$serializer = new Definition($serializerClass); |
491
|
1 |
|
$container->setDefinition($serializerId, $serializer); |
492
|
|
|
|
493
|
1 |
|
return $serializerId; |
494
|
|
|
} |
495
|
|
|
} |
496
|
|
|
|
497
|
|
|
/** |
498
|
|
|
* @param string $name |
499
|
|
|
* @param array $config |
500
|
|
|
* @param ContainerBuilder $container |
501
|
|
|
* |
502
|
|
|
* @return string |
503
|
|
|
*/ |
504
|
9 |
|
private function createProcessorDefinition($name, array $config, ContainerBuilder $container) |
505
|
|
|
{ |
506
|
9 |
|
$processorId = sprintf('tree_house.queue.processor.%s', $name); |
507
|
|
|
|
508
|
9 |
|
if (substr($config['processor'], 0, 1) === '@') { |
509
|
2 |
|
$service = ltrim($config['processor'], '@'); |
510
|
|
|
} else { |
511
|
7 |
|
$service = new Definition($config['processor']); |
512
|
7 |
|
$service->setPublic(false); |
513
|
|
|
} |
514
|
|
|
|
515
|
|
|
// decorate the process with a retry processor if needd |
516
|
9 |
|
$service = $this->decorateRetryProcessor($name, $config['retry'], $service, $container); |
517
|
|
|
|
518
|
9 |
|
if (is_string($service)) { |
519
|
1 |
|
$container->setAlias($processorId, $service); |
520
|
|
|
} else { |
521
|
8 |
|
$container->setDefinition($processorId, $service); |
522
|
|
|
} |
523
|
|
|
|
524
|
9 |
|
return $processorId; |
525
|
|
|
} |
526
|
|
|
|
527
|
|
|
/** |
528
|
|
|
* @param string $name |
529
|
|
|
* @param array $config |
530
|
|
|
* @param string|Definition $service |
531
|
|
|
* @param ContainerBuilder $container |
532
|
|
|
* |
533
|
|
|
* @return Definition |
534
|
|
|
*/ |
535
|
9 |
|
private function decorateRetryProcessor($name, array $config, $service, ContainerBuilder $container) |
536
|
|
|
{ |
537
|
|
|
// skip if we only use 1 attempt |
538
|
9 |
|
if ($config['attempts'] < 2) { |
539
|
4 |
|
return $service; |
540
|
|
|
} |
541
|
|
|
|
542
|
5 |
|
$publisherName = $config['publisher'] ?: $name; |
543
|
5 |
|
$publisherId = sprintf('tree_house.queue.publisher.%s', $publisherName); |
544
|
|
|
|
545
|
5 |
|
if (!$container->hasDefinition($publisherId)) { |
546
|
|
|
throw new InvalidConfigurationException(sprintf('There is no publisher named "%s" configured.', $publisherName)); |
547
|
|
|
} |
548
|
|
|
|
549
|
|
|
// decorate the processor |
550
|
5 |
|
$strategy = $this->createRetryStrategyDefinition($config['strategy'], $publisherId); |
551
|
|
|
|
552
|
5 |
|
$retry = new Definition(RetryProcessor::class); |
553
|
5 |
|
$retry->addArgument(is_string($service) ? new Reference($service) : $service); |
554
|
5 |
|
$retry->addArgument(is_string($strategy) ? new Reference($strategy) : $strategy); |
555
|
5 |
|
$retry->addArgument(new Reference('logger', ContainerInterface::NULL_ON_INVALID_REFERENCE)); |
556
|
5 |
|
$retry->addMethodCall('setMaxAttempts', [$config['attempts']]); |
557
|
|
|
|
558
|
5 |
|
return $retry; |
559
|
|
|
} |
560
|
|
|
|
561
|
|
|
/** |
562
|
|
|
* @param array $config |
563
|
|
|
* @param string $publisherId |
564
|
|
|
* |
565
|
|
|
* @return Definition |
566
|
|
|
*/ |
567
|
5 |
|
private function createRetryStrategyDefinition(array $config, $publisherId) |
568
|
|
|
{ |
569
|
5 |
|
switch ($config['type']) { |
570
|
5 |
|
case 'backoff': |
571
|
3 |
|
$strategy = new Definition(BackoffStrategy::class); |
572
|
3 |
|
$strategy->addArgument(new Reference($publisherId)); |
573
|
|
|
|
574
|
3 |
|
break; |
575
|
2 |
|
case 'deprioritize': |
576
|
2 |
|
$strategy = new Definition(DeprioritizeStrategy::class); |
577
|
2 |
|
$strategy->addArgument(new Reference($publisherId)); |
578
|
|
|
|
579
|
2 |
|
break; |
580
|
|
|
default: |
581
|
|
|
throw new InvalidConfigurationException(sprintf('Unsupported retry strategy: "%s"', $config['type'])); |
582
|
|
|
} |
583
|
|
|
|
584
|
5 |
|
$strategy->setPublic(false); |
585
|
|
|
|
586
|
5 |
|
return $strategy; |
587
|
|
|
} |
588
|
|
|
|
589
|
|
|
/** |
590
|
|
|
* @param ContainerBuilder $container |
591
|
|
|
*/ |
592
|
27 |
|
private function setCreatedDefinitionsParameters(ContainerBuilder $container) |
593
|
|
|
{ |
594
|
27 |
|
$container->setParameter('tree_house.queue.connections', $this->connections); |
595
|
27 |
|
$container->setParameter('tree_house.queue.exchanges', $this->exchanges); |
596
|
27 |
|
$container->setParameter('tree_house.queue.queues', $this->queues); |
597
|
27 |
|
$container->setParameter('tree_house.queue.publishers', $this->publishers); |
598
|
27 |
|
$container->setParameter('tree_house.queue.consumers', $this->consumers); |
599
|
27 |
|
} |
600
|
|
|
|
601
|
|
|
/** |
602
|
|
|
* @param string $name |
603
|
|
|
* @param array $config |
604
|
|
|
* @param ContainerBuilder $container |
605
|
|
|
* |
606
|
|
|
* @return null|string |
607
|
|
|
*/ |
608
|
22 |
|
private function getDeadLetterExchange($name, array $config, ContainerBuilder $container) |
609
|
|
|
{ |
610
|
22 |
|
if (isset($config['dlx'])) { |
611
|
1 |
|
return $config['dlx']; |
612
|
|
|
} |
613
|
|
|
|
614
|
22 |
|
if (!isset($this->dlxs[$name])) { |
615
|
22 |
|
return null; |
616
|
|
|
} |
617
|
|
|
|
618
|
5 |
|
$dlx = $container->getDefinition($this->dlxs[$name]); |
619
|
|
|
|
620
|
5 |
|
return $dlx->getArgument(1); |
621
|
|
|
} |
622
|
|
|
|
623
|
|
|
/** |
624
|
|
|
* @param array $exchange |
625
|
|
|
* |
626
|
|
|
* @return int |
627
|
|
|
*/ |
628
|
16 |
|
private function getExchangeFlagsValue(array $exchange) |
629
|
|
|
{ |
630
|
16 |
|
$flags = ExchangeInterface::NOPARAM; |
631
|
|
|
|
632
|
16 |
|
if ($exchange['durable']) { |
633
|
16 |
|
$flags |= ExchangeInterface::DURABLE; |
634
|
|
|
} |
635
|
|
|
|
636
|
16 |
|
if ($exchange['passive']) { |
637
|
2 |
|
$flags |= ExchangeInterface::PASSIVE; |
638
|
|
|
} |
639
|
|
|
|
640
|
16 |
|
return $flags; |
641
|
|
|
} |
642
|
|
|
|
643
|
|
|
/** |
644
|
|
|
* @param array $queue |
645
|
|
|
* |
646
|
|
|
* @return int |
647
|
|
|
*/ |
648
|
22 |
|
private function getQueueFlagsValue(array $queue) |
649
|
|
|
{ |
650
|
22 |
|
$flags = QueueInterface::NOPARAM; |
651
|
|
|
|
652
|
22 |
|
if ($queue['durable']) { |
653
|
21 |
|
$flags |= QueueInterface::DURABLE; |
654
|
|
|
} |
655
|
|
|
|
656
|
22 |
|
if ($queue['passive']) { |
657
|
2 |
|
$flags |= QueueInterface::PASSIVE; |
658
|
|
|
} |
659
|
|
|
|
660
|
22 |
|
if ($queue['exclusive']) { |
661
|
2 |
|
$flags |= QueueInterface::EXCLUSIVE; |
662
|
|
|
} |
663
|
|
|
|
664
|
22 |
|
if ($queue['auto_delete']) { |
665
|
2 |
|
$flags |= QueueInterface::AUTODELETE; |
666
|
|
|
} |
667
|
|
|
|
668
|
22 |
|
return $flags; |
669
|
|
|
} |
670
|
|
|
} |
671
|
|
|
|