@@ -117,14 +117,12 @@ |
||
117 | 117 | Assertion::notNull($this->sync, 'You must specify if the handler must be sync or async'); |
118 | 118 | |
119 | 119 | $syncAsync = ($this->sync) ? |
120 | - new SyncConsumerHandler($consumer, $this->driver) : |
|
121 | - new AsyncConsumerHandler($consumer); |
|
120 | + new SyncConsumerHandler($consumer, $this->driver) : new AsyncConsumerHandler($consumer); |
|
122 | 121 | |
123 | 122 | $ackHandler = new AckHandler($syncAsync, $this->driver, $this->requeueOnFailure); |
124 | 123 | |
125 | 124 | $handler = ($this->stopOnFailure) ? |
126 | - new StopOnExceptionHandler($ackHandler) : |
|
127 | - new ContinueOnExceptionHandler($ackHandler); |
|
125 | + new StopOnExceptionHandler($ackHandler) : new ContinueOnExceptionHandler($ackHandler); |
|
128 | 126 | |
129 | 127 | $syncAsync->setLogger($this->logger); |
130 | 128 | $handler->setLogger($this->logger); |
@@ -59,7 +59,7 @@ |
||
59 | 59 | |
60 | 60 | $this->driver->consume( |
61 | 61 | $this->queueName, |
62 | - function (Message $message) { |
|
62 | + function(Message $message) { |
|
63 | 63 | $this->monitor->monitor($this, $message); |
64 | 64 | return $this->handler->handle($message); |
65 | 65 | }, |
@@ -165,7 +165,7 @@ |
||
165 | 165 | $flags = $autoAck ? AMQP_AUTOACK : AMQP_NOPARAM; |
166 | 166 | |
167 | 167 | try { |
168 | - $queue->consume(function (\AMQPEnvelope $message) use ($callback, $queueName) { |
|
168 | + $queue->consume(function(\AMQPEnvelope $message) use ($callback, $queueName) { |
|
169 | 169 | |
170 | 170 | $burrowMessage = new Message( |
171 | 171 | $message->getBody(), |
@@ -57,7 +57,7 @@ discard block |
||
57 | 57 | $durable = ($type === self::QUEUE_DURABLE); |
58 | 58 | $exclusive = ($type === self::QUEUE_EXCLUSIVE); |
59 | 59 | |
60 | - list($name, , ) = $this->getChannel()->queue_declare($queueName, false, $durable, $exclusive, false); |
|
60 | + list($name,,) = $this->getChannel()->queue_declare($queueName, false, $durable, $exclusive, false); |
|
61 | 61 | |
62 | 62 | return $name; |
63 | 63 | } |
@@ -72,7 +72,7 @@ discard block |
||
72 | 72 | */ |
73 | 73 | public function declareExchange($exchangeName = '', $type = self::EXCHANGE_TYPE_FANOUT) |
74 | 74 | { |
75 | - list($name, , ) = $this->getChannel()->exchange_declare($exchangeName, $type, false, true, false); |
|
75 | + list($name,,) = $this->getChannel()->exchange_declare($exchangeName, $type, false, true, false); |
|
76 | 76 | |
77 | 77 | return $name; |
78 | 78 | } |
@@ -166,7 +166,7 @@ discard block |
||
166 | 166 | $autoAck, |
167 | 167 | false, |
168 | 168 | false, |
169 | - function (AMQPMessage $message) use ($callback, $queueName) { |
|
169 | + function(AMQPMessage $message) use ($callback, $queueName) { |
|
170 | 170 | $burrowMessage = new Message( |
171 | 171 | $message->getBody(), |
172 | 172 | '', // Impossible to retrieve here |
@@ -238,7 +238,7 @@ discard block |
||
238 | 238 | } |
239 | 239 | |
240 | 240 | /** |
241 | - * @param $timeout |
|
241 | + * @param integer $timeout |
|
242 | 242 | * |
243 | 243 | * @throws \Exception |
244 | 244 | */ |
@@ -288,7 +288,7 @@ discard block |
||
288 | 288 | /** |
289 | 289 | * @param AMQPMessage $message |
290 | 290 | * |
291 | - * @return array |
|
291 | + * @return string[] |
|
292 | 292 | */ |
293 | 293 | private static function getHeaders(AMQPMessage $message) |
294 | 294 | { |
@@ -58,7 +58,7 @@ |
||
58 | 58 | |
59 | 59 | $this->driver->consume( |
60 | 60 | $replyTo, |
61 | - function (Message $message) use ($correlationId, &$response) { |
|
61 | + function(Message $message) use ($correlationId, &$response) { |
|
62 | 62 | if ($message->getCorrelationId() == $correlationId) { |
63 | 63 | $response = $message->getBody(); |
64 | 64 | return QueueHandler::STOP_CONSUMING; |