| @@ 218-233 (lines=16) @@ | ||
| 215 | * |
|
| 216 | * @return AMQPExchange |
|
| 217 | */ |
|
| 218 | private function produceExchangeInstance($exchangeName) |
|
| 219 | { |
|
| 220 | if (isset($this->declaredExchangeInstances[$exchangeName])) { |
|
| 221 | return $this->declaredExchangeInstances[$exchangeName]; |
|
| 222 | } |
|
| 223 | ||
| 224 | if (isset($this->usedExchangeInstances[$exchangeName])) { |
|
| 225 | return $this->usedExchangeInstances[$exchangeName]; |
|
| 226 | } |
|
| 227 | ||
| 228 | $this->ensureChannel(); |
|
| 229 | $exchange = new AMQPExchange($this->channel); |
|
| 230 | $exchange->setName($exchangeName); |
|
| 231 | $this->usedExchangeInstances[$exchangeName] = $exchange; |
|
| 232 | return $exchange; |
|
| 233 | } |
|
| 234 | ||
| 235 | /** |
|
| 236 | * It returns a cached queue or a new one if none exists. |
|
| @@ 242-257 (lines=16) @@ | ||
| 239 | * |
|
| 240 | * @return AMQPQueue |
|
| 241 | */ |
|
| 242 | private function produceQueueInstance($queueName) |
|
| 243 | { |
|
| 244 | if (isset($this->declaredQueueInstances[$queueName])) { |
|
| 245 | return $this->declaredQueueInstances[$queueName]; |
|
| 246 | } |
|
| 247 | ||
| 248 | if (isset($this->usedQueueInstances[$queueName])) { |
|
| 249 | return $this->usedQueueInstances[$queueName]; |
|
| 250 | } |
|
| 251 | ||
| 252 | $this->ensureChannel(); |
|
| 253 | $queue = new AMQPQueue($this->channel); |
|
| 254 | $queue->setName($queueName); |
|
| 255 | $this->usedQueueInstances[$queueName] = $queue; |
|
| 256 | return $queue; |
|
| 257 | } |
|
| 258 | ||
| 259 | private function ensureChannel() |
|
| 260 | { |
|