@@ -4,21 +4,21 @@ |
||
| 4 | 4 | |
| 5 | 5 | class AmqpPartsHolder |
| 6 | 6 | { |
| 7 | - protected $parts; |
|
| 7 | + protected $parts; |
|
| 8 | 8 | |
| 9 | - public function __construct() |
|
| 10 | - { |
|
| 11 | - $this->parts = array(); |
|
| 12 | - } |
|
| 9 | + public function __construct() |
|
| 10 | + { |
|
| 11 | + $this->parts = array(); |
|
| 12 | + } |
|
| 13 | 13 | |
| 14 | - public function addPart($type, BaseAmqp $part) |
|
| 15 | - { |
|
| 16 | - $this->parts[$type][] = $part; |
|
| 17 | - } |
|
| 14 | + public function addPart($type, BaseAmqp $part) |
|
| 15 | + { |
|
| 16 | + $this->parts[$type][] = $part; |
|
| 17 | + } |
|
| 18 | 18 | |
| 19 | - public function getParts($type) |
|
| 20 | - { |
|
| 19 | + public function getParts($type) |
|
| 20 | + { |
|
| 21 | 21 | $type = (string) $type; |
| 22 | - return isset($this->parts[$type]) ? $this->parts[$type] : array(); |
|
| 23 | - } |
|
| 22 | + return isset($this->parts[$type]) ? $this->parts[$type] : array(); |
|
| 23 | + } |
|
| 24 | 24 | } |
@@ -64,7 +64,7 @@ discard block |
||
| 64 | 64 | //PHP 5.3 Compliant |
| 65 | 65 | $currentObject = $this; |
| 66 | 66 | |
| 67 | - $this->getChannel()->basic_consume($name, $this->getQueueConsumerTag($name), false, false, false, false, function (AMQPMessage $msg) use($currentObject, $name) { |
|
| 67 | + $this->getChannel()->basic_consume($name, $this->getQueueConsumerTag($name), false, false, false, false, function(AMQPMessage $msg) use($currentObject, $name) { |
|
| 68 | 68 | $currentObject->processQueueMessage($name, $msg); |
| 69 | 69 | }); |
| 70 | 70 | } |
@@ -73,7 +73,7 @@ discard block |
||
| 73 | 73 | protected function queueDeclare() |
| 74 | 74 | { |
| 75 | 75 | foreach ($this->queues as $name => $options) { |
| 76 | - list($queueName, ,) = $this->getChannel()->queue_declare($name, $options['passive'], |
|
| 76 | + list($queueName,,) = $this->getChannel()->queue_declare($name, $options['passive'], |
|
| 77 | 77 | $options['durable'], $options['exclusive'], |
| 78 | 78 | $options['auto_delete'], $options['nowait'], |
| 79 | 79 | $options['arguments'], $options['ticket']); |
@@ -268,7 +268,7 @@ |
||
| 268 | 268 | { |
| 269 | 269 | if ($this->gracefulMaxExecutionDateTime) { |
| 270 | 270 | $allowedExecutionDateInterval = $this->gracefulMaxExecutionDateTime->diff(new \DateTime()); |
| 271 | - $allowedExecutionSeconds = $allowedExecutionDateInterval->days * 86400 |
|
| 271 | + $allowedExecutionSeconds = $allowedExecutionDateInterval->days * 86400 |
|
| 272 | 272 | + $allowedExecutionDateInterval->h * 3600 |
| 273 | 273 | + $allowedExecutionDateInterval->i * 60 |
| 274 | 274 | + $allowedExecutionDateInterval->s; |
@@ -280,7 +280,7 @@ |
||
| 280 | 280 | private function addMessage(AMQPMessage $message) |
| 281 | 281 | { |
| 282 | 282 | $this->batchCounter++; |
| 283 | - $this->messages[(int)$message->delivery_info['delivery_tag']] = $message; |
|
| 283 | + $this->messages[(int) $message->delivery_info['delivery_tag']] = $message; |
|
| 284 | 284 | } |
| 285 | 285 | |
| 286 | 286 | /** |
@@ -218,7 +218,7 @@ |
||
| 218 | 218 | protected function queueDeclare() |
| 219 | 219 | { |
| 220 | 220 | if ($this->queueOptions['declare']) { |
| 221 | - list($queueName, ,) = $this->getChannel()->queue_declare($this->queueOptions['name'], $this->queueOptions['passive'], |
|
| 221 | + list($queueName,,) = $this->getChannel()->queue_declare($this->queueOptions['name'], $this->queueOptions['passive'], |
|
| 222 | 222 | $this->queueOptions['durable'], $this->queueOptions['exclusive'], |
| 223 | 223 | $this->queueOptions['auto_delete'], $this->queueOptions['nowait'], |
| 224 | 224 | $this->queueOptions['arguments'], $this->queueOptions['ticket']); |
@@ -39,12 +39,12 @@ |
||
| 39 | 39 | } |
| 40 | 40 | |
| 41 | 41 | $msg = new AMQPMessage($msgBody, array('content_type' => 'text/plain', |
| 42 | - 'reply_to' => $this->directReplyTo |
|
| 42 | + 'reply_to' => $this->directReplyTo |
|
| 43 | 43 | ? 'amq.rabbitmq.reply-to' // On direct reply-to mode, use predefined queue name |
| 44 | 44 | : $this->getQueueName(), |
| 45 | - 'delivery_mode' => 1, // non durable |
|
| 46 | - 'expiration' => $expiration*1000, |
|
| 47 | - 'correlation_id' => $requestId)); |
|
| 45 | + 'delivery_mode' => 1, // non durable |
|
| 46 | + 'expiration' => $expiration*1000, |
|
| 47 | + 'correlation_id' => $requestId)); |
|
| 48 | 48 | |
| 49 | 49 | $this->getChannel()->basic_publish($msg, $server, $routingKey); |
| 50 | 50 | |
@@ -43,7 +43,7 @@ discard block |
||
| 43 | 43 | ? 'amq.rabbitmq.reply-to' // On direct reply-to mode, use predefined queue name |
| 44 | 44 | : $this->getQueueName(), |
| 45 | 45 | 'delivery_mode' => 1, // non durable |
| 46 | - 'expiration' => $expiration*1000, |
|
| 46 | + 'expiration' => $expiration * 1000, |
|
| 47 | 47 | 'correlation_id' => $requestId)); |
| 48 | 48 | |
| 49 | 49 | $this->getChannel()->basic_publish($msg, $server, $routingKey); |
@@ -91,7 +91,7 @@ discard block |
||
| 91 | 91 | protected function getQueueName() |
| 92 | 92 | { |
| 93 | 93 | if (null === $this->queueName) { |
| 94 | - list($this->queueName, ,) = $this->getChannel()->queue_declare("", false, false, true, false); |
|
| 94 | + list($this->queueName,,) = $this->getChannel()->queue_declare("", false, false, true, false); |
|
| 95 | 95 | } |
| 96 | 96 | |
| 97 | 97 | return $this->queueName; |
@@ -53,7 +53,7 @@ |
||
| 53 | 53 | $msg->set('application_headers', $headersTable); |
| 54 | 54 | } |
| 55 | 55 | |
| 56 | - $this->getChannel()->basic_publish($msg, $this->exchangeOptions['name'], (string)$routingKey); |
|
| 56 | + $this->getChannel()->basic_publish($msg, $this->exchangeOptions['name'], (string) $routingKey); |
|
| 57 | 57 | $this->logger->debug('AMQP message published', array( |
| 58 | 58 | 'amqp' => array( |
| 59 | 59 | 'body' => $msgBody, |
@@ -44,7 +44,7 @@ discard block |
||
| 44 | 44 | $this->parameters = array_merge($this->parameters, $parameters); |
| 45 | 45 | $this->parameters = $this->parseUrl($this->parameters); |
| 46 | 46 | if (is_array($this->parameters['ssl_context'])) { |
| 47 | - $this->parameters['ssl_context'] = ! empty($this->parameters['ssl_context']) |
|
| 47 | + $this->parameters['ssl_context'] = !empty($this->parameters['ssl_context']) |
|
| 48 | 48 | ? stream_context_create(array('ssl' => $this->parameters['ssl_context'])) |
| 49 | 49 | : null; |
| 50 | 50 | } |
@@ -65,17 +65,17 @@ discard block |
||
| 65 | 65 | return $ref->newInstanceArgs($this->parameters['constructor_args']); |
| 66 | 66 | } |
| 67 | 67 | |
| 68 | - if ($this->class == 'PhpAmqpLib\Connection\AMQPSocketConnection' || is_subclass_of($this->class , 'PhpAmqpLib\Connection\AMQPSocketConnection')) { |
|
| 68 | + if ($this->class == 'PhpAmqpLib\Connection\AMQPSocketConnection' || is_subclass_of($this->class, 'PhpAmqpLib\Connection\AMQPSocketConnection')) { |
|
| 69 | 69 | return new $this->class( |
| 70 | 70 | $this->parameters['host'], |
| 71 | 71 | $this->parameters['port'], |
| 72 | 72 | $this->parameters['user'], |
| 73 | 73 | $this->parameters['password'], |
| 74 | 74 | $this->parameters['vhost'], |
| 75 | - false, // insist |
|
| 75 | + false, // insist |
|
| 76 | 76 | 'AMQPLAIN', // login_method |
| 77 | - null, // login_response |
|
| 78 | - 'en_US', // locale |
|
| 77 | + null, // login_response |
|
| 78 | + 'en_US', // locale |
|
| 79 | 79 | isset($this->parameters['read_timeout']) ? $this->parameters['read_timeout'] : $this->parameters['read_write_timeout'], |
| 80 | 80 | $this->parameters['keepalive'], |
| 81 | 81 | isset($this->parameters['write_timeout']) ? $this->parameters['write_timeout'] : $this->parameters['read_write_timeout'], |
@@ -88,10 +88,10 @@ discard block |
||
| 88 | 88 | $this->parameters['user'], |
| 89 | 89 | $this->parameters['password'], |
| 90 | 90 | $this->parameters['vhost'], |
| 91 | - false, // insist |
|
| 91 | + false, // insist |
|
| 92 | 92 | 'AMQPLAIN', // login_method |
| 93 | - null, // login_response |
|
| 94 | - 'en_US', // locale |
|
| 93 | + null, // login_response |
|
| 94 | + 'en_US', // locale |
|
| 95 | 95 | $this->parameters['connection_timeout'], |
| 96 | 96 | $this->parameters['read_write_timeout'], |
| 97 | 97 | $this->parameters['ssl_context'], |
@@ -125,7 +125,7 @@ discard block |
||
| 125 | 125 | $parameters['host'] = urldecode($url['host']); |
| 126 | 126 | } |
| 127 | 127 | if (isset($url['port'])) { |
| 128 | - $parameters['port'] = (int)$url['port']; |
|
| 128 | + $parameters['port'] = (int) $url['port']; |
|
| 129 | 129 | } |
| 130 | 130 | if (isset($url['user'])) { |
| 131 | 131 | $parameters['user'] = urldecode($url['user']); |
@@ -4,7 +4,7 @@ |
||
| 4 | 4 | |
| 5 | 5 | use OldSound\RabbitMqBundle\Provider\QueueOptionsProviderInterface; |
| 6 | 6 | |
| 7 | -class DynamicConsumer extends Consumer{ |
|
| 7 | +class DynamicConsumer extends Consumer { |
|
| 8 | 8 | |
| 9 | 9 | /** |
| 10 | 10 | * Queue provider |