|
1
|
|
|
<?php |
|
2
|
|
|
|
|
3
|
|
|
namespace TreeHouse\QueueBundle\DependencyInjection; |
|
4
|
|
|
|
|
5
|
|
|
use Symfony\Component\Config\Definition\Exception\InvalidConfigurationException; |
|
6
|
|
|
use Symfony\Component\Config\FileLocator; |
|
7
|
|
|
use Symfony\Component\DependencyInjection\ContainerBuilder; |
|
8
|
|
|
use Symfony\Component\DependencyInjection\ContainerInterface; |
|
9
|
|
|
use Symfony\Component\DependencyInjection\Definition; |
|
10
|
|
|
use Symfony\Component\DependencyInjection\DefinitionDecorator; |
|
11
|
|
|
use Symfony\Component\DependencyInjection\Loader\YamlFileLoader; |
|
12
|
|
|
use Symfony\Component\DependencyInjection\Reference; |
|
13
|
|
|
use Symfony\Component\HttpKernel\DependencyInjection\Extension; |
|
14
|
|
|
use TreeHouse\Queue\Amqp\ExchangeInterface; |
|
15
|
|
|
use TreeHouse\Queue\Amqp\QueueInterface; |
|
16
|
|
|
use TreeHouse\Queue\Processor\Retry\BackoffStrategy; |
|
17
|
|
|
use TreeHouse\Queue\Processor\Retry\DeprioritizeStrategy; |
|
18
|
|
|
use TreeHouse\Queue\Processor\Retry\RetryProcessor; |
|
19
|
|
|
|
|
20
|
|
|
class TreeHouseQueueExtension extends Extension |
|
21
|
|
|
{ |
|
22
|
|
|
/** |
|
23
|
|
|
* @var string[] |
|
24
|
|
|
*/ |
|
25
|
|
|
private $connections = []; |
|
26
|
|
|
|
|
27
|
|
|
/** |
|
28
|
|
|
* @var string[] |
|
29
|
|
|
*/ |
|
30
|
|
|
private $exchanges = []; |
|
31
|
|
|
|
|
32
|
|
|
/** |
|
33
|
|
|
* @var string[] |
|
34
|
|
|
*/ |
|
35
|
|
|
private $queues = []; |
|
36
|
|
|
|
|
37
|
|
|
/** |
|
38
|
|
|
* @var string[] |
|
39
|
|
|
*/ |
|
40
|
|
|
private $publishers = []; |
|
41
|
|
|
|
|
42
|
|
|
/** |
|
43
|
|
|
* @var string[] |
|
44
|
|
|
*/ |
|
45
|
|
|
private $consumers = []; |
|
46
|
|
|
|
|
47
|
|
|
/** |
|
48
|
|
|
* @var array Map that links exchanges with a DLX counterpart |
|
49
|
|
|
*/ |
|
50
|
|
|
private $dlxs = []; |
|
51
|
|
|
|
|
52
|
|
|
/** |
|
53
|
|
|
* @inheritdoc |
|
54
|
|
|
*/ |
|
55
|
29 |
|
public function load(array $configs, ContainerBuilder $container) |
|
56
|
|
|
{ |
|
57
|
29 |
|
$configuration = new Configuration(); |
|
58
|
29 |
|
$config = $this->processConfiguration($configuration, $configs); |
|
59
|
|
|
|
|
60
|
28 |
|
$loader = new YamlFileLoader($container, new FileLocator(__DIR__ . '/../Resources/config')); |
|
61
|
28 |
|
$loader->load('services.yml'); |
|
62
|
|
|
|
|
63
|
28 |
|
$this->loadDriver($config, $container); |
|
64
|
28 |
|
$this->loadConnections($config, $container); |
|
65
|
27 |
|
$this->loadPublishers($config, $container); |
|
66
|
27 |
|
$this->loadConsumers($config, $container); |
|
67
|
27 |
|
$this->loadExchanges($config, $container); |
|
68
|
27 |
|
$this->loadQueues($config, $container); |
|
69
|
|
|
|
|
70
|
27 |
|
$this->setCreatedDefinitionsParameters($container); |
|
71
|
|
|
|
|
72
|
27 |
|
if (!$config['auto_flush']) { |
|
73
|
1 |
|
$container->removeDefinition('tree_house.queue.event_listener.queue'); |
|
74
|
|
|
} |
|
75
|
27 |
|
} |
|
76
|
|
|
|
|
77
|
|
|
/** |
|
78
|
|
|
* @param array $config |
|
79
|
|
|
* @param ContainerBuilder $container |
|
80
|
|
|
*/ |
|
81
|
28 |
|
private function loadDriver(array $config, ContainerBuilder $container) |
|
82
|
|
|
{ |
|
83
|
28 |
|
$container->setParameter('tree_house.queue.driver', $config['driver']); |
|
84
|
28 |
|
$container->setAlias( |
|
85
|
28 |
|
'tree_house.amqp.factory', |
|
86
|
28 |
|
sprintf('tree_house.queue.driver.%s.factory', $config['driver']) |
|
87
|
|
|
); |
|
88
|
|
|
|
|
89
|
28 |
|
$classes = ['connection', 'channel', 'exchange', 'queue', 'publisher', 'factory']; |
|
90
|
28 |
|
foreach ($classes as $class) { |
|
91
|
28 |
|
$name = sprintf('tree_house.queue.%s.class', $class); |
|
92
|
28 |
|
if (!$container->hasParameter($name)) { |
|
93
|
28 |
|
$value = sprintf('tree_house.queue.driver.%s.%s.class', $config['driver'], $class); |
|
94
|
28 |
|
$container->setParameter($name, $container->getParameter($value)); |
|
95
|
|
|
} |
|
96
|
|
|
} |
|
97
|
28 |
|
} |
|
98
|
|
|
|
|
99
|
|
|
/** |
|
100
|
|
|
* @param array $config |
|
101
|
|
|
* @param ContainerBuilder $container |
|
102
|
|
|
* |
|
103
|
|
|
* @throws \LogicException |
|
104
|
|
|
*/ |
|
105
|
28 |
|
private function loadConnections(array $config, ContainerBuilder $container) |
|
106
|
|
|
{ |
|
107
|
28 |
|
foreach ($config['connections'] as $name => $connection) { |
|
108
|
28 |
|
$this->createConnectionDefinition($name, $connection, $container); |
|
109
|
|
|
} |
|
110
|
|
|
|
|
111
|
|
|
// default the first connection if it wasn't explicitly set |
|
112
|
28 |
|
if (!$config['default_connection']) { |
|
113
|
26 |
|
reset($config['connections']); |
|
114
|
26 |
|
$config['default_connection'] = key($config['connections']); |
|
115
|
|
|
} |
|
116
|
|
|
|
|
117
|
|
|
// set a parameter and alias for the default connection |
|
118
|
28 |
|
$connectionId = sprintf('tree_house.queue.connection.%s', $config['default_connection']); |
|
119
|
28 |
|
if (!$container->hasDefinition($connectionId)) { |
|
120
|
1 |
|
throw new \LogicException(sprintf('Connection "%s" does not exist', $connectionId)); |
|
121
|
|
|
} |
|
122
|
|
|
|
|
123
|
27 |
|
$container->setParameter('tree_house.queue.default_connection', $config['default_connection']); |
|
124
|
27 |
|
$container->setAlias('tree_house.queue.default_connection', $connectionId); |
|
125
|
27 |
|
} |
|
126
|
|
|
|
|
127
|
|
|
/** |
|
128
|
|
|
* @param array $config |
|
129
|
|
|
* @param ContainerBuilder $container |
|
130
|
|
|
*/ |
|
131
|
27 |
|
private function loadPublishers(array $config, ContainerBuilder $container) |
|
132
|
|
|
{ |
|
133
|
27 |
|
foreach ($config['publishers'] as $name => $publisher) { |
|
134
|
14 |
|
$this->createPublisherDefinition($name, $publisher, $container); |
|
135
|
|
|
} |
|
136
|
27 |
|
} |
|
137
|
|
|
|
|
138
|
|
|
/** |
|
139
|
|
|
* @param array $config |
|
140
|
|
|
* @param ContainerBuilder $container |
|
141
|
|
|
*/ |
|
142
|
27 |
|
private function loadConsumers(array $config, ContainerBuilder $container) |
|
143
|
|
|
{ |
|
144
|
27 |
|
foreach ($config['consumers'] as $name => $consumer) { |
|
145
|
9 |
|
$this->createConsumerDefinition($name, $consumer, $container); |
|
146
|
|
|
} |
|
147
|
27 |
|
} |
|
148
|
|
|
|
|
149
|
|
|
/** |
|
150
|
|
|
* @param array $config |
|
151
|
|
|
* @param ContainerBuilder $container |
|
152
|
|
|
*/ |
|
153
|
27 |
|
private function loadExchanges(array $config, ContainerBuilder $container) |
|
154
|
|
|
{ |
|
155
|
27 |
|
foreach ($config['exchanges'] as $name => $exchange) { |
|
156
|
2 |
|
$this->createExchangeDefinition($name, $exchange, $container); |
|
157
|
|
|
} |
|
158
|
27 |
|
} |
|
159
|
|
|
|
|
160
|
|
|
/** |
|
161
|
|
|
* @param array $config |
|
162
|
|
|
* @param ContainerBuilder $container |
|
163
|
|
|
*/ |
|
164
|
27 |
|
private function loadQueues(array $config, ContainerBuilder $container) |
|
165
|
|
|
{ |
|
166
|
27 |
|
foreach ($config['queues'] as $name => $queue) { |
|
167
|
3 |
|
$this->createQueueDefinition($name, $queue, $container); |
|
168
|
|
|
} |
|
169
|
27 |
|
} |
|
170
|
|
|
|
|
171
|
|
|
/** |
|
172
|
|
|
* @param string $name |
|
173
|
|
|
* @param array $config |
|
174
|
|
|
* @param ContainerBuilder $container |
|
175
|
|
|
* |
|
176
|
|
|
* @return string |
|
177
|
|
|
*/ |
|
178
|
28 |
|
private function createConnectionDefinition($name, array $config, ContainerBuilder $container) |
|
179
|
|
|
{ |
|
180
|
28 |
|
$amqpFactory = new Reference('tree_house.amqp.factory'); |
|
181
|
|
|
|
|
182
|
28 |
|
$definition = new Definition($container->getParameter('tree_house.queue.connection.class')); |
|
183
|
28 |
|
$definition->setFactory([$amqpFactory, 'createConnection']); |
|
184
|
28 |
|
$definition->addArgument($config['host']); |
|
185
|
28 |
|
$definition->addArgument((integer) $config['port']); |
|
186
|
28 |
|
$definition->addArgument($config['user']); |
|
187
|
28 |
|
$definition->addArgument($config['pass']); |
|
188
|
28 |
|
$definition->addArgument($config['vhost']); |
|
189
|
28 |
|
$definition->addArgument($config['params']); |
|
190
|
|
|
|
|
191
|
28 |
|
$connectionId = sprintf('tree_house.queue.connection.%s', $name); |
|
192
|
28 |
|
$container->setDefinition($connectionId, $definition); |
|
193
|
|
|
|
|
194
|
|
|
// create channel |
|
195
|
28 |
|
$definition = new Definition($container->getParameter('tree_house.queue.channel.class')); |
|
196
|
28 |
|
$definition->setFactory([$amqpFactory, 'createChannel']); |
|
197
|
28 |
|
$definition->addArgument(new Reference($connectionId)); |
|
198
|
|
|
|
|
199
|
|
|
// TODO set qos and prefetch stuff |
|
200
|
|
|
|
|
201
|
28 |
|
$channelId = sprintf('tree_house.queue.channel.%s', $name); |
|
202
|
28 |
|
$container->setDefinition($channelId, $definition); |
|
203
|
|
|
|
|
204
|
28 |
|
$this->connections[$name] = $connectionId; |
|
205
|
|
|
|
|
206
|
28 |
|
return $connectionId; |
|
207
|
|
|
} |
|
208
|
|
|
|
|
209
|
|
|
/** |
|
210
|
|
|
* @param string $name |
|
211
|
|
|
* @param array $config |
|
212
|
|
|
* @param ContainerBuilder $container |
|
213
|
|
|
* |
|
214
|
|
|
* @return string |
|
215
|
|
|
*/ |
|
216
|
16 |
|
private function createExchangeDefinition($name, array $config, ContainerBuilder $container) |
|
217
|
|
|
{ |
|
218
|
16 |
|
$amqpFactory = new Reference('tree_house.amqp.factory'); |
|
219
|
|
|
|
|
220
|
16 |
|
$connection = $config['connection'] ?: $container->getParameter('tree_house.queue.default_connection'); |
|
221
|
16 |
|
$channelId = sprintf('tree_house.queue.channel.%s', $connection); |
|
222
|
16 |
|
$channelAlias = sprintf('tree_house.queue.channel.%s', $name); |
|
223
|
|
|
|
|
224
|
|
|
// add alias if connection is named differently than exchange |
|
225
|
16 |
|
if ($name !== $connection) { |
|
226
|
16 |
|
$container->setAlias($channelAlias, $channelId); |
|
227
|
|
|
} |
|
228
|
|
|
|
|
229
|
16 |
|
$exchangeName = $config['name'] ?: $name; |
|
230
|
16 |
|
$exchangeType = $config['type']; |
|
231
|
16 |
|
$exchangeFlags = $this->getExchangeFlagsValue($config); |
|
232
|
16 |
|
$exchangeArguments = $config['arguments']; |
|
233
|
16 |
|
$autoDeclare = isset($config['auto_declare']) ? $config['auto_declare'] : true; |
|
234
|
|
|
|
|
235
|
|
|
// optionally create a delayed message exchange counterpart |
|
236
|
16 |
|
if (isset($config['delay']) && $config['delay']) { |
|
237
|
14 |
|
$exchangeArguments['x-delayed-type'] = $exchangeType; |
|
238
|
14 |
|
$exchangeType = ExchangeInterface::TYPE_DELAYED; |
|
239
|
|
|
} |
|
240
|
|
|
|
|
241
|
|
|
// create exchange |
|
242
|
16 |
|
$definition = new Definition($container->getParameter('tree_house.queue.exchange.class')); |
|
243
|
16 |
|
$definition->setFactory([$amqpFactory, 'createExchange']); |
|
244
|
16 |
|
$definition->addArgument(new Reference($channelAlias)); |
|
245
|
16 |
|
$definition->addArgument($exchangeName); |
|
246
|
16 |
|
$definition->addArgument($exchangeType); |
|
247
|
16 |
|
$definition->addArgument($exchangeFlags); |
|
248
|
16 |
|
$definition->addArgument($exchangeArguments); |
|
249
|
|
|
|
|
250
|
16 |
|
if ($autoDeclare) { |
|
251
|
15 |
|
$definition->addMethodCall('declareExchange'); |
|
252
|
|
|
} |
|
253
|
|
|
|
|
254
|
16 |
|
$exchangeId = sprintf('tree_house.queue.exchange.%s', $name); |
|
255
|
16 |
|
$container->setDefinition($exchangeId, $definition); |
|
256
|
|
|
|
|
257
|
16 |
|
$this->exchanges[$name] = [ |
|
258
|
16 |
|
'id' => $exchangeId, |
|
259
|
16 |
|
'auto_declare' => $autoDeclare, |
|
260
|
|
|
]; |
|
261
|
|
|
|
|
262
|
|
|
// optionally create a dead letter exchange counterpart |
|
263
|
16 |
|
if (isset($config['dlx']['enabled']) && $config['dlx']['enabled']) { |
|
264
|
16 |
|
if (!isset($config['dlx']['name'])) { |
|
265
|
14 |
|
$config['dlx']['name'] = sprintf('%s.dead', $exchangeName); |
|
266
|
|
|
} |
|
267
|
|
|
|
|
268
|
16 |
|
if (!isset($config['dlx']['connection'])) { |
|
269
|
16 |
|
$config['dlx']['connection'] = $connection; |
|
270
|
|
|
} |
|
271
|
|
|
|
|
272
|
16 |
|
if (!isset($config['dlx']['auto_declare'])) { |
|
273
|
16 |
|
$config['dlx']['auto_declare'] = $autoDeclare; |
|
274
|
|
|
} |
|
275
|
|
|
|
|
276
|
16 |
|
$dlxName = $config['dlx']['name']; |
|
277
|
16 |
|
$dlxId = $this->createExchangeDefinition($dlxName, $config['dlx'], $container); |
|
278
|
|
|
|
|
279
|
16 |
|
$this->dlxs[$name] = $dlxId; |
|
280
|
|
|
|
|
281
|
|
|
// create queue to route this DLX to |
|
282
|
16 |
|
$queue = $config['dlx']['queue']; |
|
283
|
16 |
|
if (!isset($queue['name'])) { |
|
284
|
16 |
|
$queue['name'] = $dlxName; |
|
285
|
|
|
} |
|
286
|
|
|
|
|
287
|
16 |
|
if (!isset($queue['connection'])) { |
|
288
|
16 |
|
$queue['connection'] = $connection; |
|
289
|
|
|
} |
|
290
|
|
|
|
|
291
|
16 |
|
$hasBinding = false; |
|
292
|
16 |
|
foreach ($queue['bindings'] as $binding) { |
|
293
|
|
|
if ($binding['exchange'] === $dlxName) { |
|
294
|
|
|
$hasBinding = true; |
|
295
|
|
|
break; |
|
296
|
|
|
} |
|
297
|
|
|
} |
|
298
|
16 |
|
if (!$hasBinding) { |
|
299
|
16 |
|
$queue['bindings'][] = [ |
|
300
|
16 |
|
'exchange' => $dlxName, |
|
301
|
|
|
'arguments' => [], |
|
302
|
|
|
]; |
|
303
|
|
|
} |
|
304
|
|
|
|
|
305
|
16 |
|
$this->createQueueDefinition($dlxName, $queue, $container); |
|
306
|
|
|
} |
|
307
|
|
|
|
|
308
|
16 |
|
return $exchangeId; |
|
309
|
|
|
} |
|
310
|
|
|
|
|
311
|
|
|
/** |
|
312
|
|
|
* @param ContainerBuilder $container |
|
313
|
|
|
* @param array $config |
|
314
|
|
|
* @param string $name |
|
315
|
|
|
* |
|
316
|
|
|
* @return string |
|
317
|
|
|
*/ |
|
318
|
9 |
|
private function createConsumerDefinition($name, array $config, ContainerBuilder $container) |
|
319
|
|
|
{ |
|
320
|
|
|
// create the queue |
|
321
|
9 |
|
$queue = $config['queue']; |
|
322
|
|
|
|
|
323
|
9 |
|
if (!isset($queue['name'])) { |
|
324
|
8 |
|
$queue['name'] = $name; |
|
325
|
|
|
} |
|
326
|
|
|
|
|
327
|
9 |
|
$queueId = $this->createQueueDefinition($name, $queue, $container); |
|
328
|
|
|
|
|
329
|
|
|
// create the processor |
|
330
|
9 |
|
$processorId = $this->createProcessorDefinition($name, $config, $container); |
|
331
|
|
|
|
|
332
|
|
|
// create the consumer |
|
333
|
9 |
|
$definition = new DefinitionDecorator('tree_house.queue.consumer.prototype'); |
|
334
|
9 |
|
$definition->addArgument(new Reference($queueId)); |
|
335
|
9 |
|
$definition->addArgument(new Reference($processorId)); |
|
336
|
9 |
|
$definition->addArgument(new Reference('event_dispatcher')); |
|
337
|
|
|
|
|
338
|
9 |
|
$consumerId = sprintf('tree_house.queue.consumer.%s', $name); |
|
339
|
9 |
|
$container->setDefinition($consumerId, $definition); |
|
340
|
|
|
|
|
341
|
9 |
|
$this->consumers[$name] = $consumerId; |
|
342
|
|
|
|
|
343
|
9 |
|
return $consumerId; |
|
344
|
|
|
} |
|
345
|
|
|
|
|
346
|
|
|
/** |
|
347
|
|
|
* @param string $name |
|
348
|
|
|
* @param array $config |
|
349
|
|
|
* @param ContainerBuilder $container |
|
350
|
|
|
* |
|
351
|
|
|
* @return string |
|
352
|
|
|
*/ |
|
353
|
22 |
|
private function createQueueDefinition($name, array $config, ContainerBuilder $container) |
|
354
|
|
|
{ |
|
355
|
22 |
|
$amqpFactory = new Reference('tree_house.amqp.factory'); |
|
356
|
|
|
|
|
357
|
22 |
|
$connection = $config['connection'] ?: $container->getParameter('tree_house.queue.default_connection'); |
|
358
|
22 |
|
$queueName = $config['name'] ?: $name; |
|
359
|
22 |
|
$channelId = sprintf('tree_house.queue.channel.%s', $connection); |
|
360
|
22 |
|
$arguments = $config['arguments']; |
|
361
|
22 |
|
$autoDeclare = isset($config['auto_declare']) ? $config['auto_declare'] : true; |
|
362
|
|
|
|
|
363
|
|
|
// if there is an exchange with the same name, and it has a DLX configured, set this in the arguments |
|
364
|
22 |
|
if (!array_key_exists('x-dead-letter-exchange', $arguments) && $dlx = $this->getDeadLetterExchange($name, $config, $container)) { |
|
365
|
6 |
|
$arguments['x-dead-letter-exchange'] = $dlx; |
|
366
|
|
|
} |
|
367
|
|
|
|
|
368
|
|
|
// create queue |
|
369
|
22 |
|
$definition = new Definition($container->getParameter('tree_house.queue.queue.class')); |
|
370
|
22 |
|
$definition->setFactory([$amqpFactory, 'createQueue']); |
|
371
|
22 |
|
$definition->addArgument(new Reference($channelId)); |
|
372
|
22 |
|
$definition->addArgument($queueName); |
|
373
|
22 |
|
$definition->addArgument($this->getQueueFlagsValue($config)); |
|
374
|
22 |
|
$definition->addArgument($arguments); |
|
375
|
|
|
|
|
376
|
22 |
|
if ($autoDeclare) { |
|
377
|
21 |
|
$definition->addMethodCall('declareQueue'); |
|
378
|
|
|
} |
|
379
|
|
|
|
|
380
|
22 |
|
if (empty($config['bindings'])) { |
|
381
|
|
|
// bind to the same exchange |
|
382
|
10 |
|
$config['bindings'][] = [ |
|
383
|
10 |
|
'exchange' => $name, |
|
384
|
|
|
'routing_keys' => [], |
|
385
|
|
|
'arguments' => [], |
|
386
|
|
|
]; |
|
387
|
|
|
} |
|
388
|
|
|
|
|
389
|
22 |
|
foreach ($config['bindings'] as $binding) { |
|
390
|
|
|
// if nothing is set, bind without routing key |
|
391
|
22 |
|
if (empty($binding['routing_keys'])) { |
|
392
|
21 |
|
$binding['routing_keys'] = [null]; |
|
393
|
|
|
} |
|
394
|
|
|
|
|
395
|
22 |
|
foreach ($binding['routing_keys'] as $routingKey) { |
|
396
|
22 |
|
$definition->addMethodCall('bind', [$binding['exchange'], $routingKey, $binding['arguments']]); |
|
397
|
|
|
} |
|
398
|
|
|
} |
|
399
|
|
|
|
|
400
|
22 |
|
$queueId = sprintf('tree_house.queue.queue.%s', $name); |
|
401
|
22 |
|
$container->setDefinition($queueId, $definition); |
|
402
|
|
|
|
|
403
|
22 |
|
$this->queues[$name] = [ |
|
404
|
22 |
|
'id' => $queueId, |
|
405
|
22 |
|
'auto_declare' => $autoDeclare, |
|
406
|
|
|
]; |
|
407
|
|
|
|
|
408
|
22 |
|
return $queueId; |
|
409
|
|
|
} |
|
410
|
|
|
|
|
411
|
|
|
/** |
|
412
|
|
|
* @param string $name |
|
413
|
|
|
* @param array $config |
|
414
|
|
|
* @param ContainerBuilder $container |
|
415
|
|
|
* |
|
416
|
|
|
* @return string |
|
417
|
|
|
*/ |
|
418
|
14 |
|
private function createPublisherDefinition($name, array $config, ContainerBuilder $container) |
|
419
|
|
|
{ |
|
420
|
|
|
// get the right channel for the exchange |
|
421
|
14 |
|
$exchange = $config['exchange']; |
|
422
|
14 |
|
$exchangeId = $this->createExchangeDefinition($name, $exchange, $container); |
|
423
|
|
|
|
|
424
|
|
|
// create message composer |
|
425
|
14 |
|
$composerId = $this->createMessageComposerDefinition($name, $config, $container); |
|
426
|
|
|
|
|
427
|
|
|
// create publisher |
|
428
|
14 |
|
$publisherId = sprintf('tree_house.queue.publisher.%s', $name); |
|
429
|
14 |
|
$publisher = new Definition($container->getParameter('tree_house.queue.publisher.class')); |
|
430
|
14 |
|
$publisher->setLazy(true); |
|
431
|
14 |
|
$publisher->addArgument(new Reference($exchangeId)); |
|
432
|
14 |
|
$publisher->addArgument(new Reference($composerId)); |
|
433
|
|
|
|
|
434
|
14 |
|
$container->setDefinition($publisherId, $publisher); |
|
435
|
|
|
|
|
436
|
14 |
|
$this->publishers[$name] = $publisherId; |
|
437
|
|
|
|
|
438
|
14 |
|
return $publisherId; |
|
439
|
|
|
} |
|
440
|
|
|
|
|
441
|
|
|
/** |
|
442
|
|
|
* @param string $name |
|
443
|
|
|
* @param array $config |
|
444
|
|
|
* @param ContainerBuilder $container |
|
445
|
|
|
* |
|
446
|
|
|
* @return string |
|
447
|
|
|
*/ |
|
448
|
14 |
|
private function createMessageComposerDefinition($name, array $config, ContainerBuilder $container) |
|
449
|
|
|
{ |
|
450
|
14 |
|
$composerId = sprintf('tree_house.queue.composer.%s', $name); |
|
451
|
14 |
|
$composer = $config['composer']; |
|
452
|
|
|
|
|
453
|
|
|
// resolve service |
|
454
|
14 |
|
if (substr($composer, 0, 1) === '@') { |
|
455
|
1 |
|
$container->setAlias($composerId, ltrim($composer, '@')); |
|
456
|
|
|
} else { |
|
457
|
|
|
// resolve parameter |
|
458
|
13 |
|
if (substr($composer, 0, 1) === '%') { |
|
459
|
13 |
|
$composer = $container->getParameter(substr($composer, 1, -1)); |
|
460
|
|
|
} |
|
461
|
|
|
|
|
462
|
|
|
// create serializer first |
|
463
|
13 |
|
$serializerId = $this->createMessageSerializerDefinition($name, $config['serializer'], $container); |
|
464
|
|
|
|
|
465
|
13 |
|
$composerDef = new Definition($composer); |
|
466
|
13 |
|
$composerDef->addArgument(new Reference($serializerId)); |
|
467
|
13 |
|
$container->setDefinition($composerId, $composerDef); |
|
468
|
|
|
} |
|
469
|
|
|
|
|
470
|
14 |
|
return $composerId; |
|
471
|
|
|
} |
|
472
|
|
|
|
|
473
|
|
|
/** |
|
474
|
|
|
* @param string $name |
|
475
|
|
|
* @param string $serializerClass |
|
476
|
|
|
* @param ContainerBuilder $container |
|
477
|
|
|
* |
|
478
|
|
|
* @return string |
|
479
|
|
|
*/ |
|
480
|
13 |
|
private function createMessageSerializerDefinition($name, $serializerClass, ContainerBuilder $container) |
|
481
|
|
|
{ |
|
482
|
13 |
|
$serializerId = sprintf('tree_house.queue.serializer.%s', $name); |
|
483
|
|
|
|
|
484
|
|
|
// resolve service |
|
485
|
13 |
|
if (substr($serializerClass, 0, 1) === '@') { |
|
486
|
12 |
|
$container->setAlias($serializerId, ltrim($serializerClass, '@')); |
|
487
|
|
|
|
|
488
|
12 |
|
return $serializerId; |
|
489
|
|
|
} else { |
|
490
|
1 |
|
$serializer = new Definition($serializerClass); |
|
491
|
1 |
|
$container->setDefinition($serializerId, $serializer); |
|
492
|
|
|
|
|
493
|
1 |
|
return $serializerId; |
|
494
|
|
|
} |
|
495
|
|
|
} |
|
496
|
|
|
|
|
497
|
|
|
/** |
|
498
|
|
|
* @param string $name |
|
499
|
|
|
* @param array $config |
|
500
|
|
|
* @param ContainerBuilder $container |
|
501
|
|
|
* |
|
502
|
|
|
* @return string |
|
503
|
|
|
*/ |
|
504
|
9 |
|
private function createProcessorDefinition($name, array $config, ContainerBuilder $container) |
|
505
|
|
|
{ |
|
506
|
9 |
|
$processorId = sprintf('tree_house.queue.processor.%s', $name); |
|
507
|
|
|
|
|
508
|
9 |
|
if (substr($config['processor'], 0, 1) === '@') { |
|
509
|
2 |
|
$service = ltrim($config['processor'], '@'); |
|
510
|
|
|
} else { |
|
511
|
7 |
|
$service = new Definition($config['processor']); |
|
512
|
7 |
|
$service->setPublic(false); |
|
513
|
|
|
} |
|
514
|
|
|
|
|
515
|
|
|
// decorate the process with a retry processor if needd |
|
516
|
9 |
|
$service = $this->decorateRetryProcessor($name, $config['retry'], $service, $container); |
|
517
|
|
|
|
|
518
|
9 |
|
if (is_string($service)) { |
|
519
|
1 |
|
$container->setAlias($processorId, $service); |
|
520
|
|
|
} else { |
|
521
|
8 |
|
$container->setDefinition($processorId, $service); |
|
522
|
|
|
} |
|
523
|
|
|
|
|
524
|
9 |
|
return $processorId; |
|
525
|
|
|
} |
|
526
|
|
|
|
|
527
|
|
|
/** |
|
528
|
|
|
* @param string $name |
|
529
|
|
|
* @param array $config |
|
530
|
|
|
* @param string|Definition $service |
|
531
|
|
|
* @param ContainerBuilder $container |
|
532
|
|
|
* |
|
533
|
|
|
* @return Definition |
|
534
|
|
|
*/ |
|
535
|
9 |
|
private function decorateRetryProcessor($name, array $config, $service, ContainerBuilder $container) |
|
536
|
|
|
{ |
|
537
|
|
|
// skip if we only use 1 attempt |
|
538
|
9 |
|
if ($config['attempts'] < 2) { |
|
539
|
4 |
|
return $service; |
|
540
|
|
|
} |
|
541
|
|
|
|
|
542
|
5 |
|
$publisherName = $config['publisher'] ?: $name; |
|
543
|
5 |
|
$publisherId = sprintf('tree_house.queue.publisher.%s', $publisherName); |
|
544
|
|
|
|
|
545
|
5 |
|
if (!$container->hasDefinition($publisherId)) { |
|
546
|
|
|
throw new InvalidConfigurationException(sprintf('There is no publisher named "%s" configured.', $publisherName)); |
|
547
|
|
|
} |
|
548
|
|
|
|
|
549
|
|
|
// decorate the processor |
|
550
|
5 |
|
$strategy = $this->createRetryStrategyDefinition($config['strategy'], $publisherId); |
|
551
|
|
|
|
|
552
|
5 |
|
$retry = new Definition(RetryProcessor::class); |
|
553
|
5 |
|
$retry->addArgument(is_string($service) ? new Reference($service) : $service); |
|
554
|
5 |
|
$retry->addArgument(is_string($strategy) ? new Reference($strategy) : $strategy); |
|
555
|
5 |
|
$retry->addArgument(new Reference('logger', ContainerInterface::NULL_ON_INVALID_REFERENCE)); |
|
556
|
5 |
|
$retry->addMethodCall('setMaxAttempts', [$config['attempts']]); |
|
557
|
|
|
|
|
558
|
5 |
|
return $retry; |
|
559
|
|
|
} |
|
560
|
|
|
|
|
561
|
|
|
/** |
|
562
|
|
|
* @param array $config |
|
563
|
|
|
* @param string $publisherId |
|
564
|
|
|
* |
|
565
|
|
|
* @return Definition |
|
566
|
|
|
*/ |
|
567
|
5 |
|
private function createRetryStrategyDefinition(array $config, $publisherId) |
|
568
|
|
|
{ |
|
569
|
5 |
|
switch ($config['type']) { |
|
570
|
5 |
|
case 'backoff': |
|
571
|
3 |
|
$strategy = new Definition(BackoffStrategy::class); |
|
572
|
3 |
|
$strategy->addArgument(new Reference($publisherId)); |
|
573
|
|
|
|
|
574
|
3 |
|
break; |
|
575
|
2 |
|
case 'deprioritize': |
|
576
|
2 |
|
$strategy = new Definition(DeprioritizeStrategy::class); |
|
577
|
2 |
|
$strategy->addArgument(new Reference($publisherId)); |
|
578
|
|
|
|
|
579
|
2 |
|
break; |
|
580
|
|
|
default: |
|
581
|
|
|
throw new InvalidConfigurationException(sprintf('Unsupported retry strategy: "%s"', $config['type'])); |
|
582
|
|
|
} |
|
583
|
|
|
|
|
584
|
5 |
|
$strategy->setPublic(false); |
|
585
|
|
|
|
|
586
|
5 |
|
return $strategy; |
|
587
|
|
|
} |
|
588
|
|
|
|
|
589
|
|
|
/** |
|
590
|
|
|
* @param ContainerBuilder $container |
|
591
|
|
|
*/ |
|
592
|
27 |
|
private function setCreatedDefinitionsParameters(ContainerBuilder $container) |
|
593
|
|
|
{ |
|
594
|
27 |
|
$container->setParameter('tree_house.queue.connections', $this->connections); |
|
595
|
27 |
|
$container->setParameter('tree_house.queue.exchanges', $this->exchanges); |
|
596
|
27 |
|
$container->setParameter('tree_house.queue.queues', $this->queues); |
|
597
|
27 |
|
$container->setParameter('tree_house.queue.publishers', $this->publishers); |
|
598
|
27 |
|
$container->setParameter('tree_house.queue.consumers', $this->consumers); |
|
599
|
27 |
|
} |
|
600
|
|
|
|
|
601
|
|
|
/** |
|
602
|
|
|
* @param string $name |
|
603
|
|
|
* @param array $config |
|
604
|
|
|
* @param ContainerBuilder $container |
|
605
|
|
|
* |
|
606
|
|
|
* @return null|string |
|
607
|
|
|
*/ |
|
608
|
22 |
|
private function getDeadLetterExchange($name, array $config, ContainerBuilder $container) |
|
609
|
|
|
{ |
|
610
|
22 |
|
if (isset($config['dlx'])) { |
|
611
|
1 |
|
return $config['dlx']; |
|
612
|
|
|
} |
|
613
|
|
|
|
|
614
|
22 |
|
if (!isset($this->dlxs[$name])) { |
|
615
|
22 |
|
return null; |
|
616
|
|
|
} |
|
617
|
|
|
|
|
618
|
5 |
|
$dlx = $container->getDefinition($this->dlxs[$name]); |
|
619
|
|
|
|
|
620
|
5 |
|
return $dlx->getArgument(1); |
|
621
|
|
|
} |
|
622
|
|
|
|
|
623
|
|
|
/** |
|
624
|
|
|
* @param array $exchange |
|
625
|
|
|
* |
|
626
|
|
|
* @return int |
|
627
|
|
|
*/ |
|
628
|
16 |
|
private function getExchangeFlagsValue(array $exchange) |
|
629
|
|
|
{ |
|
630
|
16 |
|
$flags = ExchangeInterface::NOPARAM; |
|
631
|
|
|
|
|
632
|
16 |
|
if ($exchange['durable']) { |
|
633
|
16 |
|
$flags |= ExchangeInterface::DURABLE; |
|
634
|
|
|
} |
|
635
|
|
|
|
|
636
|
16 |
|
if ($exchange['passive']) { |
|
637
|
2 |
|
$flags |= ExchangeInterface::PASSIVE; |
|
638
|
|
|
} |
|
639
|
|
|
|
|
640
|
16 |
|
return $flags; |
|
641
|
|
|
} |
|
642
|
|
|
|
|
643
|
|
|
/** |
|
644
|
|
|
* @param array $queue |
|
645
|
|
|
* |
|
646
|
|
|
* @return int |
|
647
|
|
|
*/ |
|
648
|
22 |
|
private function getQueueFlagsValue(array $queue) |
|
649
|
|
|
{ |
|
650
|
22 |
|
$flags = QueueInterface::NOPARAM; |
|
651
|
|
|
|
|
652
|
22 |
|
if ($queue['durable']) { |
|
653
|
21 |
|
$flags |= QueueInterface::DURABLE; |
|
654
|
|
|
} |
|
655
|
|
|
|
|
656
|
22 |
|
if ($queue['passive']) { |
|
657
|
2 |
|
$flags |= QueueInterface::PASSIVE; |
|
658
|
|
|
} |
|
659
|
|
|
|
|
660
|
22 |
|
if ($queue['exclusive']) { |
|
661
|
2 |
|
$flags |= QueueInterface::EXCLUSIVE; |
|
662
|
|
|
} |
|
663
|
|
|
|
|
664
|
22 |
|
if ($queue['auto_delete']) { |
|
665
|
2 |
|
$flags |= QueueInterface::AUTODELETE; |
|
666
|
|
|
} |
|
667
|
|
|
|
|
668
|
22 |
|
return $flags; |
|
669
|
|
|
} |
|
670
|
|
|
} |
|
671
|
|
|
|