@@ -64,7 +64,7 @@ discard block |
||
| 64 | 64 | //PHP 5.3 Compliant |
| 65 | 65 | $currentObject = $this; |
| 66 | 66 | |
| 67 | - $this->getChannel()->basic_consume($name, $this->getQueueConsumerTag($name), false, false, false, false, function (AMQPMessage $msg) use($currentObject, $name) { |
|
| 67 | + $this->getChannel()->basic_consume($name, $this->getQueueConsumerTag($name), false, false, false, false, function(AMQPMessage $msg) use($currentObject, $name) { |
|
| 68 | 68 | $currentObject->processQueueMessage($name, $msg); |
| 69 | 69 | }); |
| 70 | 70 | } |
@@ -73,7 +73,7 @@ discard block |
||
| 73 | 73 | protected function queueDeclare() |
| 74 | 74 | { |
| 75 | 75 | foreach ($this->queues as $name => $options) { |
| 76 | - list($queueName, ,) = $this->getChannel()->queue_declare($name, $options['passive'], |
|
| 76 | + list($queueName,,) = $this->getChannel()->queue_declare($name, $options['passive'], |
|
| 77 | 77 | $options['durable'], $options['exclusive'], |
| 78 | 78 | $options['auto_delete'], $options['nowait'], |
| 79 | 79 | $options['arguments'], $options['ticket']); |
@@ -280,7 +280,7 @@ |
||
| 280 | 280 | private function addMessage(AMQPMessage $message) |
| 281 | 281 | { |
| 282 | 282 | $this->batchCounter++; |
| 283 | - $this->messages[(int)$message->delivery_info['delivery_tag']] = $message; |
|
| 283 | + $this->messages[(int) $message->delivery_info['delivery_tag']] = $message; |
|
| 284 | 284 | } |
| 285 | 285 | |
| 286 | 286 | /** |
@@ -218,7 +218,7 @@ |
||
| 218 | 218 | protected function queueDeclare() |
| 219 | 219 | { |
| 220 | 220 | if ($this->queueOptions['declare']) { |
| 221 | - list($queueName, ,) = $this->getChannel()->queue_declare($this->queueOptions['name'], $this->queueOptions['passive'], |
|
| 221 | + list($queueName,,) = $this->getChannel()->queue_declare($this->queueOptions['name'], $this->queueOptions['passive'], |
|
| 222 | 222 | $this->queueOptions['durable'], $this->queueOptions['exclusive'], |
| 223 | 223 | $this->queueOptions['auto_delete'], $this->queueOptions['nowait'], |
| 224 | 224 | $this->queueOptions['arguments'], $this->queueOptions['ticket']); |
@@ -43,7 +43,7 @@ discard block |
||
| 43 | 43 | ? 'amq.rabbitmq.reply-to' // On direct reply-to mode, use predefined queue name |
| 44 | 44 | : $this->getQueueName(), |
| 45 | 45 | 'delivery_mode' => 1, // non durable |
| 46 | - 'expiration' => $expiration*1000, |
|
| 46 | + 'expiration' => $expiration * 1000, |
|
| 47 | 47 | 'correlation_id' => $requestId)); |
| 48 | 48 | |
| 49 | 49 | $this->getChannel()->basic_publish($msg, $server, $routingKey); |
@@ -91,7 +91,7 @@ discard block |
||
| 91 | 91 | protected function getQueueName() |
| 92 | 92 | { |
| 93 | 93 | if (null === $this->queueName) { |
| 94 | - list($this->queueName, ,) = $this->getChannel()->queue_declare("", false, false, true, false); |
|
| 94 | + list($this->queueName,,) = $this->getChannel()->queue_declare("", false, false, true, false); |
|
| 95 | 95 | } |
| 96 | 96 | |
| 97 | 97 | return $this->queueName; |
@@ -53,7 +53,7 @@ |
||
| 53 | 53 | $msg->set('application_headers', $headersTable); |
| 54 | 54 | } |
| 55 | 55 | |
| 56 | - $this->getChannel()->basic_publish($msg, $this->exchangeOptions['name'], (string)$routingKey); |
|
| 56 | + $this->getChannel()->basic_publish($msg, $this->exchangeOptions['name'], (string) $routingKey); |
|
| 57 | 57 | $this->logger->debug('AMQP message published', array( |
| 58 | 58 | 'amqp' => array( |
| 59 | 59 | 'body' => $msgBody, |
@@ -4,7 +4,7 @@ |
||
| 4 | 4 | |
| 5 | 5 | use OldSound\RabbitMqBundle\Provider\QueueOptionsProviderInterface; |
| 6 | 6 | |
| 7 | -class DynamicConsumer extends Consumer{ |
|
| 7 | +class DynamicConsumer extends Consumer { |
|
| 8 | 8 | |
| 9 | 9 | /** |
| 10 | 10 | * Queue provider |
@@ -11,7 +11,7 @@ discard block |
||
| 11 | 11 | public function initServer($name) |
| 12 | 12 | { |
| 13 | 13 | $this->setExchangeOptions(array('name' => $name, 'type' => 'direct')); |
| 14 | - $this->setQueueOptions(array('name' => $name . '-queue')); |
|
| 14 | + $this->setQueueOptions(array('name' => $name.'-queue')); |
|
| 15 | 15 | } |
| 16 | 16 | |
| 17 | 17 | public function processMessage(AMQPMessage $msg) |
@@ -24,7 +24,7 @@ discard block |
||
| 24 | 24 | $this->consumed++; |
| 25 | 25 | $this->maybeStopConsumer(); |
| 26 | 26 | } catch (\Exception $e) { |
| 27 | - $this->sendReply('error: ' . $e->getMessage(), $msg->get('reply_to'), $msg->get('correlation_id')); |
|
| 27 | + $this->sendReply('error: '.$e->getMessage(), $msg->get('reply_to'), $msg->get('correlation_id')); |
|
| 28 | 28 | } |
| 29 | 29 | } |
| 30 | 30 | |
@@ -29,7 +29,7 @@ |
||
| 29 | 29 | $partsHolder = $this->getContainer()->get('old_sound_rabbit_mq.parts_holder'); |
| 30 | 30 | |
| 31 | 31 | foreach (array('base_amqp', 'binding') as $key) { |
| 32 | - foreach ($partsHolder->getParts('old_sound_rabbit_mq.' . $key) as $baseAmqp) { |
|
| 32 | + foreach ($partsHolder->getParts('old_sound_rabbit_mq.'.$key) as $baseAmqp) { |
|
| 33 | 33 | if ($baseAmqp instanceof DynamicConsumer) { |
| 34 | 34 | continue; |
| 35 | 35 | } |
@@ -59,7 +59,7 @@ |
||
| 59 | 59 | } |
| 60 | 60 | } |
| 61 | 61 | |
| 62 | - return (int)$numerical; |
|
| 62 | + return (int) $numerical; |
|
| 63 | 63 | } |
| 64 | 64 | |
| 65 | 65 | } |