1 | <?php |
||
2 | |||
3 | namespace Bdf\QueueBundle\ConnectionFactory; |
||
4 | |||
5 | use Bdf\Queue\Connection\AmqpLib\AmqpLibConnection; |
||
6 | use Bdf\Queue\Connection\ConnectionDriverInterface; |
||
7 | use Bdf\Queue\Connection\Doctrine\DoctrineConnection; |
||
8 | use Bdf\Queue\Connection\Factory\ConnectionDriverFactoryInterface; |
||
9 | use Bdf\Queue\Connection\Gearman\GearmanConnection; |
||
10 | use Bdf\Queue\Connection\Memory\MemoryConnection; |
||
11 | use Bdf\Queue\Connection\Null\NullConnection; |
||
12 | use Bdf\Queue\Connection\Pheanstalk\PheanstalkConnection; |
||
13 | use Bdf\Queue\Connection\RdKafka\RdKafkaConnection; |
||
14 | use Bdf\Queue\Connection\Redis\RedisConnection; |
||
15 | use Bdf\Queue\Serializer\SerializerInterface; |
||
16 | use Psr\Container\ContainerInterface; |
||
17 | |||
18 | final class ConnectionDriverFactory implements ConnectionDriverFactoryInterface |
||
19 | { |
||
20 | /** |
||
21 | * @var ContainerInterface |
||
22 | */ |
||
23 | private $container; |
||
24 | |||
25 | /** |
||
26 | * @var string |
||
27 | */ |
||
28 | private $containerId; |
||
29 | |||
30 | /** |
||
31 | * @var string |
||
32 | */ |
||
33 | private $defaultConnection; |
||
34 | |||
35 | /** |
||
36 | * @var string[] |
||
37 | */ |
||
38 | private $connectionNames; |
||
39 | |||
40 | /** |
||
41 | * @var ConnectionDriverConfiguratorInterface[] |
||
42 | */ |
||
43 | private $configurators = []; |
||
44 | |||
45 | /** |
||
46 | * @param string|null $defaultConnection The default connection name |
||
47 | * @param string[] $connectionNames All the connection names |
||
48 | */ |
||
49 | 6 | public function __construct(ContainerInterface $container, string $defaultConnection = null, array $connectionNames = [], string $containerId = 'bdf_queue.connection_definition.%s') |
|
50 | { |
||
51 | 6 | $this->container = $container; |
|
52 | 6 | $this->containerId = $containerId; |
|
53 | 6 | $this->defaultConnection = $defaultConnection; |
|
54 | 6 | $this->connectionNames = $connectionNames; |
|
55 | } |
||
56 | |||
57 | /** |
||
58 | * {@inheritdoc} |
||
59 | */ |
||
60 | 3 | public function create(?string $name): ConnectionDriverInterface |
|
61 | { |
||
62 | 3 | $id = sprintf($this->containerId, $name); |
|
63 | |||
64 | 3 | if (!$this->container->has($id)) { |
|
65 | throw new \InvalidArgumentException('No queue driver has been set for '.$name); |
||
66 | } |
||
67 | |||
68 | 3 | return $this->container->get($id); |
|
69 | } |
||
70 | |||
71 | /** |
||
72 | * {@inheritdoc} |
||
73 | */ |
||
74 | public function defaultConnectionName(): string |
||
75 | { |
||
76 | return $this->defaultConnection; |
||
77 | } |
||
78 | |||
79 | /** |
||
80 | * {@inheritdoc} |
||
81 | */ |
||
82 | public function defaultConnection(): ConnectionDriverInterface |
||
83 | { |
||
84 | return $this->create($this->defaultConnection); |
||
85 | } |
||
86 | |||
87 | /** |
||
88 | * {@inheritdoc} |
||
89 | */ |
||
90 | public function connectionNames(): array |
||
91 | { |
||
92 | return $this->connectionNames; |
||
93 | } |
||
94 | |||
95 | /** |
||
96 | * Register a custom configurator. |
||
97 | */ |
||
98 | 6 | public function registerConfigurator(ConnectionDriverConfiguratorInterface $configurator): void |
|
99 | { |
||
100 | 6 | foreach ($configurator->getSupportedDrivers() as $driver) { |
|
101 | 6 | $this->configurators[$driver] = $configurator; |
|
102 | } |
||
103 | } |
||
104 | |||
105 | /** |
||
106 | * Create the connection driver instance. |
||
107 | * |
||
108 | * @return ConnectionDriverInterface |
||
109 | * |
||
110 | * @internal |
||
111 | */ |
||
112 | 3 | public function createDriver(Configuration $config, SerializerInterface $serializer) |
|
113 | { |
||
114 | 3 | if (isset($this->configurators[$config->getDriver()])) { |
|
115 | 1 | return $this->configurators[$config->getDriver()]->configure($config, $serializer); |
|
116 | } |
||
117 | |||
118 | 2 | return $this->configureKnownDriver($config, $serializer); |
|
119 | } |
||
120 | |||
121 | /** |
||
122 | * Create the known driver instance. |
||
123 | * |
||
124 | * @return ConnectionDriverInterface |
||
125 | * |
||
126 | * @internal |
||
127 | */ |
||
128 | 2 | private function configureKnownDriver(Configuration $config, SerializerInterface $serializer) |
|
129 | { |
||
130 | 2 | switch ($config->getDriver()) { |
|
131 | 2 | case 'null': |
|
132 | $connection = new NullConnection($config->getConnection()); |
||
133 | $connection->setConfig($config->toArray()); |
||
134 | |||
135 | return $connection; |
||
136 | |||
137 | 2 | case 'memory': |
|
138 | 2 | $connection = new MemoryConnection($config->getConnection(), $serializer); |
|
139 | 2 | $connection->setConfig($config->toArray()); |
|
140 | |||
141 | 2 | return $connection; |
|
142 | |||
143 | case 'gearman': |
||
144 | $connection = new GearmanConnection($config->getConnection(), $serializer); |
||
145 | $connection->setConfig($config->toArray()); |
||
146 | |||
147 | return $connection; |
||
148 | |||
149 | case 'amqp-lib': |
||
150 | $connection = new AmqpLibConnection( |
||
151 | $config->getConnection(), |
||
152 | $serializer, |
||
153 | $config->has('exchange_resolver') ? $this->container->get($config->get('exchange_resolver')) : null |
||
0 ignored issues
–
show
Bug
introduced
by
![]() |
|||
154 | ); |
||
155 | $connection->setConfig($config->toArray()); |
||
156 | |||
157 | return $connection; |
||
158 | |||
159 | case 'pheanstalk': |
||
160 | $connection = new PheanstalkConnection($config->getConnection(), $serializer); |
||
161 | $connection->setConfig($config->toArray()); |
||
162 | |||
163 | return $connection; |
||
164 | |||
165 | case 'rdkafka': |
||
166 | $connection = new RdKafkaConnection($config->getConnection(), $serializer); |
||
167 | $connection->setConfig($config->toArray()); |
||
168 | |||
169 | return $connection; |
||
170 | |||
171 | case 'redis': |
||
172 | $connection = new RedisConnection($config->getConnection(), $serializer); |
||
173 | $connection->setConfig($config->toArray()); |
||
174 | |||
175 | return $connection; |
||
176 | |||
177 | case 'doctrine': |
||
178 | $connection = new DoctrineConnection($config->getConnection(), $serializer); |
||
179 | $connection->setConfig($config->toArray()); |
||
180 | |||
181 | return $connection; |
||
182 | } |
||
183 | |||
184 | throw new \InvalidArgumentException('The queue driver "'.$config->getDriver().'" does not exist. Did you forget to add "connection_factory" option ?'); |
||
185 | } |
||
186 | } |
||
187 |