@@ -295,7 +295,7 @@ discard block |
||
295 | 295 | false, $this->getAutoAck(), |
296 | 296 | false, |
297 | 297 | false, |
298 | - function ($message) use ($processMessage) { |
|
298 | + function($message) use ($processMessage) { |
|
299 | 299 | /** @var AMQPMessage $message */ |
300 | 300 | $message = new Message($message); |
301 | 301 | return $processMessage($message); |
@@ -320,7 +320,7 @@ discard block |
||
320 | 320 | * @param AMQPChannel $channel |
321 | 321 | * @param AbstractConnection $connection |
322 | 322 | */ |
323 | - return function ($channel, $connection) { |
|
323 | + return function($channel, $connection) { |
|
324 | 324 | $channel->close(); |
325 | 325 | $connection->close(); |
326 | 326 | }; |
@@ -153,7 +153,9 @@ discard block |
||
153 | 153 | protected function makeConnection(string $name) |
154 | 154 | { |
155 | 155 | $config = $this->configuration($name); |
156 | - if (isset($config['host'])) $config = [$config]; |
|
156 | + if (isset($config['host'])) { |
|
157 | + $config = [$config]; |
|
158 | + } |
|
157 | 159 | return AMQPStreamConnection::create_connection($config); |
158 | 160 | } |
159 | 161 | |
@@ -429,10 +431,15 @@ discard block |
||
429 | 431 | |
430 | 432 | $channel->queue_bind($this->getQueue(), $this->getExchange()); |
431 | 433 | |
432 | - if ($message instanceof Message) $message->getMessage(); |
|
434 | + if ($message instanceof Message) { |
|
435 | + $message->getMessage(); |
|
436 | + } |
|
433 | 437 | if (!$message instanceof AMQPMessage) { |
434 | - if ($message instanceof Collection) $message = $message->toJson(); |
|
435 | - elseif (is_array($message) || is_object($message)) $message = json_encode($message); |
|
438 | + if ($message instanceof Collection) { |
|
439 | + $message = $message->toJson(); |
|
440 | + } elseif (is_array($message) || is_object($message)) { |
|
441 | + $message = json_encode($message); |
|
442 | + } |
|
436 | 443 | $message = new AMQPMessage($message, ['content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]); |
437 | 444 | } |
438 | 445 |
@@ -23,7 +23,7 @@ |
||
23 | 23 | $this->mergeConfigFrom( |
24 | 24 | dirname(__DIR__) . '/../config/amqp.php', 'amqp' |
25 | 25 | ); |
26 | - $this->app->singleton(AMQPManager::class, function ($app) { |
|
26 | + $this->app->singleton(AMQPManager::class, function($app) { |
|
27 | 27 | return new AMQPManager($app); |
28 | 28 | }); |
29 | 29 | $this->app->alias(AMQPManager::class, 'amqp'); |
@@ -51,7 +51,7 @@ |
||
51 | 51 | * |
52 | 52 | * @return AMQPMessage |
53 | 53 | */ |
54 | - public function getMessage(){ |
|
54 | + public function getMessage() { |
|
55 | 55 | return $this->message; |
56 | 56 | } |
57 | 57 |
@@ -79,11 +79,11 @@ |
||
79 | 79 | ->setConsumerTag($this->consumerTag) |
80 | 80 | ->setAutoAck($this->autoAsk) |
81 | 81 | ->setRouteKey($this->routeKey) |
82 | - ->consume(function ($message) { |
|
82 | + ->consume(function($message) { |
|
83 | 83 | /** @var Message $message */ |
84 | 84 | return static::processMessage($message); |
85 | 85 | }); |
86 | - } catch (InvalidArgumentException|ErrorException|Exception $exception) { |
|
86 | + } catch (InvalidArgumentException | ErrorException | Exception $exception) { |
|
87 | 87 | Log::error('AMQPMessage consume error:' . $exception->getMessage()); |
88 | 88 | } |
89 | 89 | } |