eMAGTechLabs /
RabbitMqBundle
| 1 | <?php |
||
| 2 | |||
| 3 | namespace OldSound\RabbitMqBundle\RabbitMq; |
||
| 4 | |||
| 5 | use OldSound\RabbitMqBundle\Event\AMQPEvent; |
||
| 6 | use PhpAmqpLib\Channel\AMQPChannel; |
||
| 7 | use PhpAmqpLib\Connection\AbstractConnection; |
||
| 8 | use Psr\Log\LoggerInterface; |
||
| 9 | use Psr\Log\NullLogger; |
||
| 10 | use Symfony\Component\EventDispatcher\EventDispatcherInterface; |
||
| 11 | use Symfony\Contracts\EventDispatcher\EventDispatcherInterface as ContractsEventDispatcherInterface; |
||
| 12 | |||
| 13 | abstract class BaseAmqp |
||
| 14 | { |
||
| 15 | protected $conn; |
||
| 16 | protected $ch; |
||
| 17 | protected $consumerTag; |
||
| 18 | protected $exchangeDeclared = false; |
||
| 19 | protected $queueDeclared = false; |
||
| 20 | protected $routingKey = ''; |
||
| 21 | protected $autoSetupFabric = true; |
||
| 22 | protected $basicProperties = array('content_type' => 'text/plain', 'delivery_mode' => 2); |
||
| 23 | |||
| 24 | /** |
||
| 25 | * @var LoggerInterface |
||
| 26 | */ |
||
| 27 | protected $logger; |
||
| 28 | |||
| 29 | protected $exchangeOptions = array( |
||
| 30 | 'passive' => false, |
||
| 31 | 'durable' => true, |
||
| 32 | 'auto_delete' => false, |
||
| 33 | 'internal' => false, |
||
| 34 | 'nowait' => false, |
||
| 35 | 'arguments' => null, |
||
| 36 | 'ticket' => null, |
||
| 37 | 'declare' => true, |
||
| 38 | ); |
||
| 39 | |||
| 40 | protected $queueOptions = array( |
||
| 41 | 'name' => '', |
||
| 42 | 'passive' => false, |
||
| 43 | 'durable' => true, |
||
| 44 | 'exclusive' => false, |
||
| 45 | 'auto_delete' => false, |
||
| 46 | 'nowait' => false, |
||
| 47 | 'arguments' => null, |
||
| 48 | 'ticket' => null, |
||
| 49 | 'declare' => true, |
||
| 50 | ); |
||
| 51 | |||
| 52 | /** |
||
| 53 | * @var EventDispatcherInterface|null |
||
| 54 | */ |
||
| 55 | protected $eventDispatcher = null; |
||
| 56 | |||
| 57 | /** |
||
| 58 | * @param AbstractConnection $conn |
||
| 59 | * @param AMQPChannel|null $ch |
||
| 60 | * @param null $consumerTag |
||
|
0 ignored issues
–
show
Documentation
Bug
introduced
by
Loading history...
|
|||
| 61 | */ |
||
| 62 | 68 | public function __construct(AbstractConnection $conn, AMQPChannel $ch = null, $consumerTag = null) |
|
| 63 | { |
||
| 64 | 68 | $this->conn = $conn; |
|
| 65 | 68 | $this->ch = $ch; |
|
| 66 | |||
| 67 | 68 | if ($conn->connectOnConstruct()) { |
|
| 68 | 1 | $this->getChannel(); |
|
| 69 | } |
||
| 70 | |||
| 71 | 68 | $this->consumerTag = empty($consumerTag) ? sprintf("PHPPROCESS_%s_%s", gethostname(), getmypid()) : $consumerTag; |
|
| 72 | |||
| 73 | 68 | $this->logger = new NullLogger(); |
|
| 74 | 68 | } |
|
| 75 | |||
| 76 | 47 | public function __destruct() |
|
| 77 | { |
||
| 78 | 47 | $this->close(); |
|
| 79 | 47 | } |
|
| 80 | |||
| 81 | 47 | public function close() |
|
| 82 | { |
||
| 83 | 47 | if ($this->ch) { |
|
| 84 | try { |
||
| 85 | 41 | $this->ch->close(); |
|
| 86 | } catch (\Exception $e) { |
||
| 87 | // ignore on shutdown |
||
| 88 | } |
||
| 89 | } |
||
| 90 | |||
| 91 | 47 | if ($this->conn && $this->conn->isConnected()) { |
|
| 92 | try { |
||
| 93 | $this->conn->close(); |
||
| 94 | } catch (\Exception $e) { |
||
| 95 | // ignore on shutdown |
||
| 96 | } |
||
| 97 | } |
||
| 98 | 47 | } |
|
| 99 | |||
| 100 | public function reconnect() |
||
| 101 | { |
||
| 102 | if (!$this->conn->isConnected()) { |
||
| 103 | return; |
||
| 104 | } |
||
| 105 | |||
| 106 | $this->conn->reconnect(); |
||
| 107 | } |
||
| 108 | |||
| 109 | /** |
||
| 110 | * @return AMQPChannel |
||
| 111 | */ |
||
| 112 | 24 | public function getChannel() |
|
| 113 | { |
||
| 114 | 24 | if (empty($this->ch) || null === $this->ch->getChannelId()) { |
|
| 115 | 1 | $this->ch = $this->conn->channel(); |
|
| 116 | } |
||
| 117 | |||
| 118 | 24 | return $this->ch; |
|
| 119 | } |
||
| 120 | |||
| 121 | /** |
||
| 122 | * @param AMQPChannel $ch |
||
| 123 | * |
||
| 124 | * @return void |
||
| 125 | */ |
||
| 126 | 19 | public function setChannel(AMQPChannel $ch) |
|
| 127 | { |
||
| 128 | 19 | $this->ch = $ch; |
|
| 129 | 19 | } |
|
| 130 | |||
| 131 | /** |
||
| 132 | * @throws \InvalidArgumentException |
||
| 133 | * @param array $options |
||
| 134 | * @return void |
||
| 135 | */ |
||
| 136 | 2 | public function setExchangeOptions(array $options = array()) |
|
| 137 | { |
||
| 138 | 2 | if (!isset($options['name'])) { |
|
| 139 | throw new \InvalidArgumentException('You must provide an exchange name'); |
||
| 140 | } |
||
| 141 | |||
| 142 | 2 | if (empty($options['type'])) { |
|
| 143 | throw new \InvalidArgumentException('You must provide an exchange type'); |
||
| 144 | } |
||
| 145 | |||
| 146 | 2 | $this->exchangeOptions = array_merge($this->exchangeOptions, $options); |
|
| 147 | 2 | } |
|
| 148 | |||
| 149 | /** |
||
| 150 | * @param array $options |
||
| 151 | * @return void |
||
| 152 | */ |
||
| 153 | public function setQueueOptions(array $options = array()) |
||
| 154 | { |
||
| 155 | $this->queueOptions = array_merge($this->queueOptions, $options); |
||
| 156 | } |
||
| 157 | |||
| 158 | /** |
||
| 159 | * @param string $routingKey |
||
| 160 | * @return void |
||
| 161 | */ |
||
| 162 | 2 | public function setRoutingKey($routingKey) |
|
| 163 | { |
||
| 164 | 2 | $this->routingKey = $routingKey; |
|
| 165 | 2 | } |
|
| 166 | |||
| 167 | 2 | public function setupFabric() |
|
| 168 | { |
||
| 169 | 2 | if (!$this->exchangeDeclared) { |
|
| 170 | 2 | $this->exchangeDeclare(); |
|
| 171 | } |
||
| 172 | |||
| 173 | 2 | if (!$this->queueDeclared) { |
|
| 174 | 2 | $this->queueDeclare(); |
|
| 175 | } |
||
| 176 | 2 | } |
|
| 177 | |||
| 178 | /** |
||
| 179 | * disables the automatic SetupFabric when using a consumer or producer |
||
| 180 | */ |
||
| 181 | 18 | public function disableAutoSetupFabric() |
|
| 182 | { |
||
| 183 | 18 | $this->autoSetupFabric = false; |
|
| 184 | 18 | } |
|
| 185 | |||
| 186 | /** |
||
| 187 | * @param LoggerInterface $logger |
||
| 188 | */ |
||
| 189 | public function setLogger($logger) |
||
| 190 | { |
||
| 191 | $this->logger = $logger; |
||
| 192 | } |
||
| 193 | |||
| 194 | /** |
||
| 195 | * Declares exchange |
||
| 196 | */ |
||
| 197 | 2 | protected function exchangeDeclare() |
|
| 198 | { |
||
| 199 | 2 | if ($this->exchangeOptions['declare']) { |
|
| 200 | $this->getChannel()->exchange_declare( |
||
| 201 | $this->exchangeOptions['name'], |
||
| 202 | $this->exchangeOptions['type'], |
||
| 203 | $this->exchangeOptions['passive'], |
||
| 204 | $this->exchangeOptions['durable'], |
||
| 205 | $this->exchangeOptions['auto_delete'], |
||
| 206 | $this->exchangeOptions['internal'], |
||
| 207 | $this->exchangeOptions['nowait'], |
||
| 208 | $this->exchangeOptions['arguments'], |
||
| 209 | $this->exchangeOptions['ticket']); |
||
| 210 | |||
| 211 | $this->exchangeDeclared = true; |
||
| 212 | } |
||
| 213 | 2 | } |
|
| 214 | |||
| 215 | /** |
||
| 216 | * Declares queue, creates if needed |
||
| 217 | */ |
||
| 218 | protected function queueDeclare() |
||
| 219 | { |
||
| 220 | if ($this->queueOptions['declare']) { |
||
| 221 | list($queueName, ,) = $this->getChannel()->queue_declare($this->queueOptions['name'], $this->queueOptions['passive'], |
||
| 222 | $this->queueOptions['durable'], $this->queueOptions['exclusive'], |
||
| 223 | $this->queueOptions['auto_delete'], $this->queueOptions['nowait'], |
||
| 224 | $this->queueOptions['arguments'], $this->queueOptions['ticket']); |
||
| 225 | |||
| 226 | if (isset($this->queueOptions['routing_keys']) && count($this->queueOptions['routing_keys']) > 0) { |
||
| 227 | foreach ($this->queueOptions['routing_keys'] as $routingKey) { |
||
| 228 | $this->queueBind($queueName, $this->exchangeOptions['name'], $routingKey, $this->queueOptions['arguments'] ?? []); |
||
| 229 | } |
||
| 230 | } else { |
||
| 231 | $this->queueBind($queueName, $this->exchangeOptions['name'], $this->routingKey, $this->queueOptions['arguments'] ?? []); |
||
| 232 | } |
||
| 233 | |||
| 234 | $this->queueDeclared = true; |
||
| 235 | } |
||
| 236 | } |
||
| 237 | |||
| 238 | /** |
||
| 239 | * Binds queue to an exchange |
||
| 240 | * |
||
| 241 | * @param string $queue |
||
| 242 | * @param string $exchange |
||
| 243 | * @param string $routing_key |
||
| 244 | */ |
||
| 245 | 2 | protected function queueBind($queue, $exchange, $routing_key, array $arguments = array()) |
|
| 246 | { |
||
| 247 | // queue binding is not permitted on the default exchange |
||
| 248 | 2 | if ('' !== $exchange) { |
|
| 249 | 2 | $this->getChannel()->queue_bind($queue, $exchange, $routing_key, false, $arguments); |
|
| 250 | } |
||
| 251 | 2 | } |
|
| 252 | |||
| 253 | /** |
||
| 254 | * @param EventDispatcherInterface $eventDispatcher |
||
| 255 | * |
||
| 256 | * @return BaseAmqp |
||
| 257 | */ |
||
| 258 | 20 | public function setEventDispatcher(EventDispatcherInterface $eventDispatcher) |
|
| 259 | { |
||
| 260 | 20 | $this->eventDispatcher = $eventDispatcher; |
|
| 261 | |||
| 262 | 20 | return $this; |
|
| 263 | } |
||
| 264 | |||
| 265 | /** |
||
| 266 | * @param string $eventName |
||
| 267 | * @param AMQPEvent $event |
||
| 268 | */ |
||
| 269 | 49 | protected function dispatchEvent($eventName, AMQPEvent $event) |
|
| 270 | { |
||
| 271 | 49 | if ($this->getEventDispatcher() instanceof ContractsEventDispatcherInterface) { |
|
| 272 | 19 | $this->getEventDispatcher()->dispatch( |
|
| 273 | 19 | $event, |
|
| 274 | $eventName |
||
| 275 | ); |
||
| 276 | } |
||
| 277 | 49 | } |
|
| 278 | |||
| 279 | /** |
||
| 280 | * @return EventDispatcherInterface|null |
||
| 281 | */ |
||
| 282 | 48 | public function getEventDispatcher() |
|
| 283 | { |
||
| 284 | 48 | return $this->eventDispatcher; |
|
| 285 | } |
||
| 286 | } |
||
| 287 |