@@ -52,12 +52,12 @@ discard block |
||
52 | 52 | */ |
53 | 53 | public function publish(Message $message) |
54 | 54 | { |
55 | - $this->logger->debug('Publishing message to queue: ' . $this->queue->getName()); |
|
55 | + $this->logger->debug('Publishing message to queue: '.$this->queue->getName()); |
|
56 | 56 | |
57 | 57 | $attributes = ['correlation_id' => $message->getCorrelationId()]; |
58 | 58 | $result = $this->exchange->publish($message->getBody(), $this->queue->getName(), AMQP_DURABLE, $attributes); |
59 | 59 | |
60 | - if (! $result) { |
|
60 | + if (!$result) { |
|
61 | 61 | throw new MessagePublishingException(); |
62 | 62 | } |
63 | 63 | } |
@@ -67,9 +67,9 @@ discard block |
||
67 | 67 | */ |
68 | 68 | public function handle(MessageHandlerResolver $resolver) |
69 | 69 | { |
70 | - $this->logger->debug('Consuming messages from AMQP queue: ' . $this->queue->getName()); |
|
70 | + $this->logger->debug('Consuming messages from AMQP queue: '.$this->queue->getName()); |
|
71 | 71 | |
72 | - $this->queue->consume(function (\AMQPEnvelope $envelope, \AMQPQueue $queue) use ($resolver) { |
|
72 | + $this->queue->consume(function(\AMQPEnvelope $envelope, \AMQPQueue $queue) use ($resolver) { |
|
73 | 73 | $message = new Message($envelope->getBody(), $envelope->getCorrelationId()); |
74 | 74 | $handler = $resolver->resolveHandler($message); |
75 | 75 | |
@@ -78,7 +78,7 @@ discard block |
||
78 | 78 | $handler->handle($message); |
79 | 79 | $this->ackMessage($queue, $envelope); |
80 | 80 | } catch (MessageHandlingException $exception) { |
81 | - $this->logger->error('Caught exception while handling message: ' . $exception->getMessage()); |
|
81 | + $this->logger->error('Caught exception while handling message: '.$exception->getMessage()); |
|
82 | 82 | $this->nackMessage($queue, $envelope); |
83 | 83 | } |
84 | 84 |
@@ -63,7 +63,7 @@ discard block |
||
63 | 63 | } |
64 | 64 | |
65 | 65 | /** |
66 | - * @param $name |
|
66 | + * @param string $name |
|
67 | 67 | * @return MessageQueue |
68 | 68 | */ |
69 | 69 | public function getNamedQueue($name) |
@@ -77,7 +77,7 @@ discard block |
||
77 | 77 | * @param \AMQPExchange $exchange |
78 | 78 | * @param $queueName |
79 | 79 | * @param string $errorExchangeName |
80 | - * @return \AMQPQueue |
|
80 | + * @return AmqpMessageQueue |
|
81 | 81 | */ |
82 | 82 | private function buildQueue(\AMQPExchange $exchange, $queueName, $errorExchangeName = '') |
83 | 83 | { |
@@ -71,7 +71,7 @@ |
||
71 | 71 | return $this->queues[$queueName] = $queue; |
72 | 72 | } |
73 | 73 | |
74 | - throw new \RuntimeException('Queue is not registered: ' . $queueName); |
|
74 | + throw new \RuntimeException('Queue is not registered: '.$queueName); |
|
75 | 75 | } |
76 | 76 | |
77 | 77 | /** |