b2pweb /
bdf-queue-bundle
| 1 | <?php |
||
| 2 | |||
| 3 | namespace Bdf\QueueBundle\ConnectionFactory; |
||
| 4 | |||
| 5 | use Bdf\Queue\Connection\AmqpLib\AmqpLibConnection; |
||
| 6 | use Bdf\Queue\Connection\ConnectionDriverInterface; |
||
| 7 | use Bdf\Queue\Connection\Doctrine\DoctrineConnection; |
||
| 8 | use Bdf\Queue\Connection\Factory\ConnectionDriverFactoryInterface; |
||
| 9 | use Bdf\Queue\Connection\Gearman\GearmanConnection; |
||
| 10 | use Bdf\Queue\Connection\Memory\MemoryConnection; |
||
| 11 | use Bdf\Queue\Connection\Null\NullConnection; |
||
| 12 | use Bdf\Queue\Connection\Pheanstalk\PheanstalkConnection; |
||
| 13 | use Bdf\Queue\Connection\RdKafka\RdKafkaConnection; |
||
| 14 | use Bdf\Queue\Connection\Redis\RedisConnection; |
||
| 15 | use Bdf\Queue\Serializer\SerializerInterface; |
||
| 16 | use Psr\Container\ContainerInterface; |
||
| 17 | |||
| 18 | final class ConnectionDriverFactory implements ConnectionDriverFactoryInterface |
||
| 19 | { |
||
| 20 | /** |
||
| 21 | * @var ContainerInterface |
||
| 22 | */ |
||
| 23 | private $container; |
||
| 24 | |||
| 25 | /** |
||
| 26 | * @var string |
||
| 27 | */ |
||
| 28 | private $containerId; |
||
| 29 | |||
| 30 | /** |
||
| 31 | * @var string |
||
| 32 | */ |
||
| 33 | private $defaultConnection; |
||
| 34 | |||
| 35 | /** |
||
| 36 | * @var string[] |
||
| 37 | */ |
||
| 38 | private $connectionNames; |
||
| 39 | |||
| 40 | /** |
||
| 41 | * @var ConnectionDriverConfiguratorInterface[] |
||
| 42 | */ |
||
| 43 | private $configurators = []; |
||
| 44 | |||
| 45 | /** |
||
| 46 | * @param string|null $defaultConnection The default connection name |
||
| 47 | * @param string[] $connectionNames All the connection names |
||
| 48 | */ |
||
| 49 | 6 | public function __construct(ContainerInterface $container, string $defaultConnection = null, array $connectionNames = [], string $containerId = 'bdf_queue.connection_definition.%s') |
|
| 50 | { |
||
| 51 | 6 | $this->container = $container; |
|
| 52 | 6 | $this->containerId = $containerId; |
|
| 53 | 6 | $this->defaultConnection = $defaultConnection; |
|
| 54 | 6 | $this->connectionNames = $connectionNames; |
|
| 55 | } |
||
| 56 | |||
| 57 | /** |
||
| 58 | * {@inheritdoc} |
||
| 59 | */ |
||
| 60 | 3 | public function create(?string $name): ConnectionDriverInterface |
|
| 61 | { |
||
| 62 | 3 | $id = sprintf($this->containerId, $name); |
|
| 63 | |||
| 64 | 3 | if (!$this->container->has($id)) { |
|
| 65 | throw new \InvalidArgumentException('No queue driver has been set for '.$name); |
||
| 66 | } |
||
| 67 | |||
| 68 | 3 | return $this->container->get($id); |
|
| 69 | } |
||
| 70 | |||
| 71 | /** |
||
| 72 | * {@inheritdoc} |
||
| 73 | */ |
||
| 74 | public function defaultConnectionName(): string |
||
| 75 | { |
||
| 76 | return $this->defaultConnection; |
||
| 77 | } |
||
| 78 | |||
| 79 | /** |
||
| 80 | * {@inheritdoc} |
||
| 81 | */ |
||
| 82 | public function defaultConnection(): ConnectionDriverInterface |
||
| 83 | { |
||
| 84 | return $this->create($this->defaultConnection); |
||
| 85 | } |
||
| 86 | |||
| 87 | /** |
||
| 88 | * {@inheritdoc} |
||
| 89 | */ |
||
| 90 | public function connectionNames(): array |
||
| 91 | { |
||
| 92 | return $this->connectionNames; |
||
| 93 | } |
||
| 94 | |||
| 95 | /** |
||
| 96 | * Register a custom configurator. |
||
| 97 | */ |
||
| 98 | 6 | public function registerConfigurator(ConnectionDriverConfiguratorInterface $configurator): void |
|
| 99 | { |
||
| 100 | 6 | foreach ($configurator->getSupportedDrivers() as $driver) { |
|
| 101 | 6 | $this->configurators[$driver] = $configurator; |
|
| 102 | } |
||
| 103 | } |
||
| 104 | |||
| 105 | /** |
||
| 106 | * Create the connection driver instance. |
||
| 107 | * |
||
| 108 | * @return ConnectionDriverInterface |
||
| 109 | * |
||
| 110 | * @internal |
||
| 111 | */ |
||
| 112 | 3 | public function createDriver(Configuration $config, SerializerInterface $serializer) |
|
| 113 | { |
||
| 114 | 3 | if (isset($this->configurators[$config->getDriver()])) { |
|
| 115 | 1 | return $this->configurators[$config->getDriver()]->configure($config, $serializer); |
|
| 116 | } |
||
| 117 | |||
| 118 | 2 | return $this->configureKnownDriver($config, $serializer); |
|
| 119 | } |
||
| 120 | |||
| 121 | /** |
||
| 122 | * Create the known driver instance. |
||
| 123 | * |
||
| 124 | * @return ConnectionDriverInterface |
||
| 125 | * |
||
| 126 | * @internal |
||
| 127 | */ |
||
| 128 | 2 | private function configureKnownDriver(Configuration $config, SerializerInterface $serializer) |
|
| 129 | { |
||
| 130 | 2 | switch ($config->getDriver()) { |
|
| 131 | 2 | case 'null': |
|
| 132 | $connection = new NullConnection($config->getConnection()); |
||
| 133 | $connection->setConfig($config->toArray()); |
||
| 134 | |||
| 135 | return $connection; |
||
| 136 | |||
| 137 | 2 | case 'memory': |
|
| 138 | 2 | $connection = new MemoryConnection($config->getConnection(), $serializer); |
|
| 139 | 2 | $connection->setConfig($config->toArray()); |
|
| 140 | |||
| 141 | 2 | return $connection; |
|
| 142 | |||
| 143 | case 'gearman': |
||
| 144 | $connection = new GearmanConnection($config->getConnection(), $serializer); |
||
| 145 | $connection->setConfig($config->toArray()); |
||
| 146 | |||
| 147 | return $connection; |
||
| 148 | |||
| 149 | case 'amqp-lib': |
||
| 150 | $connection = new AmqpLibConnection( |
||
| 151 | $config->getConnection(), |
||
| 152 | $serializer, |
||
| 153 | $config->has('exchange_resolver') ? $this->container->get($config->get('exchange_resolver')) : null |
||
|
0 ignored issues
–
show
Bug
introduced
by
Loading history...
|
|||
| 154 | ); |
||
| 155 | $connection->setConfig($config->toArray()); |
||
| 156 | |||
| 157 | return $connection; |
||
| 158 | |||
| 159 | case 'pheanstalk': |
||
| 160 | $connection = new PheanstalkConnection($config->getConnection(), $serializer); |
||
| 161 | $connection->setConfig($config->toArray()); |
||
| 162 | |||
| 163 | return $connection; |
||
| 164 | |||
| 165 | case 'rdkafka': |
||
| 166 | $connection = new RdKafkaConnection($config->getConnection(), $serializer); |
||
| 167 | $connection->setConfig($config->toArray()); |
||
| 168 | |||
| 169 | return $connection; |
||
| 170 | |||
| 171 | case 'redis': |
||
| 172 | $connection = new RedisConnection($config->getConnection(), $serializer); |
||
| 173 | $connection->setConfig($config->toArray()); |
||
| 174 | |||
| 175 | return $connection; |
||
| 176 | |||
| 177 | case 'doctrine': |
||
| 178 | $connection = new DoctrineConnection($config->getConnection(), $serializer); |
||
| 179 | $connection->setConfig($config->toArray()); |
||
| 180 | |||
| 181 | return $connection; |
||
| 182 | } |
||
| 183 | |||
| 184 | throw new \InvalidArgumentException('The queue driver "'.$config->getDriver().'" does not exist. Did you forget to add "connection_factory" option ?'); |
||
| 185 | } |
||
| 186 | } |
||
| 187 |