@@ -58,7 +58,7 @@ |
||
58 | 58 | |
59 | 59 | $this->driver->consume( |
60 | 60 | $replyTo, |
61 | - function (Message $message) use ($correlationId, &$response) { |
|
61 | + function(Message $message) use ($correlationId, &$response) { |
|
62 | 62 | if ($message->getCorrelationId() == $correlationId) { |
63 | 63 | $response = $message->getBody(); |
64 | 64 | return QueueHandler::STOP_CONSUMING; |
@@ -135,19 +135,16 @@ |
||
135 | 135 | |
136 | 136 | // Sync |
137 | 137 | $handler = $this->sync ? |
138 | - new SyncConsumerHandler($consumer, $this->driver) : |
|
139 | - new AsyncConsumerHandler($consumer); |
|
138 | + new SyncConsumerHandler($consumer, $this->driver) : new AsyncConsumerHandler($consumer); |
|
140 | 139 | $handler->setLogger($this->logger); |
141 | 140 | |
142 | 141 | // Ack |
143 | 142 | $handler = $this->autoAck ? |
144 | - $handler : |
|
145 | - new AckHandler($handler, $this->driver, $this->requeueOnFailure); |
|
143 | + $handler : new AckHandler($handler, $this->driver, $this->requeueOnFailure); |
|
146 | 144 | |
147 | 145 | // Stop / Continue |
148 | 146 | $handler = $this->stopOnFailure ? |
149 | - new StopOnExceptionHandler($handler) : |
|
150 | - new ContinueOnExceptionHandler($handler); |
|
147 | + new StopOnExceptionHandler($handler) : new ContinueOnExceptionHandler($handler); |
|
151 | 148 | $handler->setLogger($this->logger); |
152 | 149 | |
153 | 150 | return $handler; |
@@ -140,7 +140,7 @@ discard block |
||
140 | 140 | } |
141 | 141 | |
142 | 142 | /** |
143 | - * @param array $exchangeInformation |
|
143 | + * @param string[] $exchangeInformation |
|
144 | 144 | * |
145 | 145 | * @throws AssertionFailedException |
146 | 146 | */ |
@@ -180,7 +180,7 @@ discard block |
||
180 | 180 | } |
181 | 181 | |
182 | 182 | /** |
183 | - * @param array $exchangeInformation |
|
183 | + * @param string[] $exchangeInformation |
|
184 | 184 | * @param OutputInterface $output |
185 | 185 | */ |
186 | 186 | private function bindExchange(array $exchangeInformation, OutputInterface $output) |
@@ -207,7 +207,7 @@ discard block |
||
207 | 207 | |
208 | 208 | /** |
209 | 209 | * @param string $exchangeName |
210 | - * @param array $queueInformation |
|
210 | + * @param string[] $queueInformation |
|
211 | 211 | * @param OutputInterface $output |
212 | 212 | */ |
213 | 213 | private function bindQueue( |
@@ -3,17 +3,17 @@ |
||
3 | 3 | namespace Burrow\Daemon; |
4 | 4 | |
5 | 5 | use Burrow\ConsumeOptions; |
6 | +use Burrow\Driver; |
|
6 | 7 | use Burrow\Event\DaemonStarted; |
7 | 8 | use Burrow\Event\DaemonStopped; |
8 | 9 | use Burrow\Event\MessageConsumed; |
9 | 10 | use Burrow\Event\MessageReceived; |
10 | 11 | use Burrow\Event\NullEmitter; |
12 | +use Burrow\Message; |
|
13 | +use Burrow\QueueHandler; |
|
11 | 14 | use Evaneos\Daemon\Daemon; |
12 | 15 | use Evaneos\Daemon\DaemonMonitor; |
13 | -use Burrow\Driver; |
|
14 | -use Burrow\Message; |
|
15 | 16 | use Evaneos\Daemon\Monitor\NullMonitor; |
16 | -use Burrow\QueueHandler; |
|
17 | 17 | use League\Event\Emitter; |
18 | 18 | use League\Event\EmitterInterface; |
19 | 19 | use Psr\Log\LoggerAwareInterface; |
@@ -77,7 +77,7 @@ |
||
77 | 77 | |
78 | 78 | $this->driver->consume( |
79 | 79 | $this->queueName, |
80 | - function (Message $message) { |
|
80 | + function(Message $message) { |
|
81 | 81 | $this->eventEmitter->emit(new MessageReceived()); |
82 | 82 | $this->monitor->monitor($this, $message); |
83 | 83 |
@@ -202,7 +202,7 @@ |
||
202 | 202 | $flags = $autoAck ? AMQP_AUTOACK : AMQP_NOPARAM; |
203 | 203 | $consumerTag = self::generateConsumerTag(); |
204 | 204 | try { |
205 | - $queue->consume(function (\AMQPEnvelope $message) use ($callback, $queueName) { |
|
205 | + $queue->consume(function(\AMQPEnvelope $message) use ($callback, $queueName) { |
|
206 | 206 | |
207 | 207 | $burrowMessage = new Message( |
208 | 208 | $message->getBody(), |
@@ -253,7 +253,7 @@ discard block |
||
253 | 253 | } |
254 | 254 | |
255 | 255 | /** |
256 | - * @param $timeout |
|
256 | + * @param integer $timeout |
|
257 | 257 | * |
258 | 258 | * @throws Exception |
259 | 259 | */ |
@@ -302,7 +302,7 @@ discard block |
||
302 | 302 | /** |
303 | 303 | * @param AMQPMessage $message |
304 | 304 | * |
305 | - * @return array |
|
305 | + * @return string[] |
|
306 | 306 | * |
307 | 307 | * @throws \OutOfBoundsException |
308 | 308 | */ |
@@ -338,6 +338,9 @@ discard block |
||
338 | 338 | $message->get(self::REPLY_TO) : ''; |
339 | 339 | } |
340 | 340 | |
341 | + /** |
|
342 | + * @param string $exchangeName |
|
343 | + */ |
|
341 | 344 | private function retryPublish($exchangeName, Message $message) |
342 | 345 | { |
343 | 346 | $this->retryPublish = true; |
@@ -64,7 +64,7 @@ discard block |
||
64 | 64 | $durable = ($type === self::QUEUE_DURABLE); |
65 | 65 | $exclusive = ($type === self::QUEUE_EXCLUSIVE); |
66 | 66 | |
67 | - list($name, ,) = $this->getChannel()->queue_declare($queueName, false, $durable, $exclusive, false); |
|
67 | + list($name,,) = $this->getChannel()->queue_declare($queueName, false, $durable, $exclusive, false); |
|
68 | 68 | |
69 | 69 | return $name; |
70 | 70 | } |
@@ -79,7 +79,7 @@ discard block |
||
79 | 79 | */ |
80 | 80 | public function declareExchange($exchangeName = '', $type = self::EXCHANGE_TYPE_FANOUT) |
81 | 81 | { |
82 | - list($name, ,) = $this->getChannel()->exchange_declare($exchangeName, $type, false, true, false); |
|
82 | + list($name,,) = $this->getChannel()->exchange_declare($exchangeName, $type, false, true, false); |
|
83 | 83 | |
84 | 84 | return $name; |
85 | 85 | } |
@@ -181,7 +181,7 @@ discard block |
||
181 | 181 | $autoAck, |
182 | 182 | false, |
183 | 183 | false, |
184 | - function (AMQPMessage $message) use ($callback, $queueName) { |
|
184 | + function(AMQPMessage $message) use ($callback, $queueName) { |
|
185 | 185 | $burrowMessage = new Message( |
186 | 186 | $message->getBody(), |
187 | 187 | '', // Impossible to retrieve here |