@@ -128,19 +128,19 @@ |
||
| 128 | 128 | private function protocol() : Events |
| 129 | 129 | { |
| 130 | 130 | return (new Events) |
| 131 | - ->attach(Events\Socket::CONNECTED, function (Stream $conn) { |
|
| 131 | + ->attach(Events\Socket::CONNECTED, function(Stream $conn) { |
|
| 132 | 132 | new Handshake($conn, $this, $this->connected); |
| 133 | 133 | $this->consuming && new Subscribe($this->connected, $this, $this->consuming); |
| 134 | 134 | }) |
| 135 | - ->attach(Events\Socket::RECEIVED, function (Stream $conn) { |
|
| 136 | - $this->receiver->inbound($conn->recv(), $this, function () { |
|
| 135 | + ->attach(Events\Socket::RECEIVED, function(Stream $conn) { |
|
| 136 | + $this->receiver->inbound($conn->recv(), $this, function() { |
|
| 137 | 137 | return $this->waiting; |
| 138 | 138 | }, $this->consuming); |
| 139 | 139 | }) |
| 140 | - ->attach(Events\Socket::CLOSED, function () { |
|
| 140 | + ->attach(Events\Socket::CLOSED, function() { |
|
| 141 | 141 | $this->disconnected->resolve(); |
| 142 | 142 | }) |
| 143 | - ->attach(Events\Socket::ERROR, function () { |
|
| 143 | + ->attach(Events\Socket::ERROR, function() { |
|
| 144 | 144 | $this->socket && $this->socket->close(); |
| 145 | 145 | $this->destroy(); |
| 146 | 146 | }) |
@@ -44,7 +44,7 @@ |
||
| 44 | 44 | public function __construct(Endpoint $endpoint, Consuming $consuming = null) |
| 45 | 45 | { |
| 46 | 46 | if (is_null($consuming)) { |
| 47 | - $this->pool = new Pool(new Options, function () use ($endpoint) { |
|
| 47 | + $this->pool = new Pool(new Options, function() use ($endpoint) { |
|
| 48 | 48 | return new Nsqd($endpoint); |
| 49 | 49 | }, "nsqd:{$endpoint->service()}"); |
| 50 | 50 | } else { |
@@ -46,11 +46,11 @@ |
||
| 46 | 46 | protected function background() : Promised |
| 47 | 47 | { |
| 48 | 48 | if ($this->queued()) { |
| 49 | - $this->exited()->then(function () { |
|
| 49 | + $this->exited()->then(function() { |
|
| 50 | 50 | $this->chan()->close(); |
| 51 | 51 | }); |
| 52 | 52 | |
| 53 | - new Worker($this->chan(), function (array $messages) { |
|
| 53 | + new Worker($this->chan(), function(array $messages) { |
|
| 54 | 54 | try { |
| 55 | 55 | yield $this->sending(...$messages); |
| 56 | 56 | } catch (Throwable $e) { |
@@ -88,7 +88,7 @@ |
||
| 88 | 88 | |
| 89 | 89 | if ($this->consumer) { |
| 90 | 90 | $this->consumer->setTopic($topic)->setChannel($channel); |
| 91 | - $subscribed->then(function () { |
|
| 91 | + $subscribed->then(function() { |
|
| 92 | 92 | $this->consumer->startup(); |
| 93 | 93 | }); |
| 94 | 94 | } |
@@ -22,7 +22,7 @@ |
||
| 22 | 22 | */ |
| 23 | 23 | protected function consuming() : Consuming |
| 24 | 24 | { |
| 25 | - return new Consuming(function () { |
|
| 25 | + return new Consuming(function() { |
|
| 26 | 26 | // do nothing |
| 27 | 27 | }); |
| 28 | 28 | } |
@@ -46,10 +46,10 @@ |
||
| 46 | 46 | */ |
| 47 | 47 | protected function background() : Promised |
| 48 | 48 | { |
| 49 | - return async(function () { |
|
| 49 | + return async(function() { |
|
| 50 | 50 | yield $this->syncedEndpoints( |
| 51 | 51 | $this->getTopic(), |
| 52 | - function (int $action, Endpoint $endpoint) { |
|
| 52 | + function(int $action, Endpoint $endpoint) { |
|
| 53 | 53 | switch ($action) { |
| 54 | 54 | case Linker::ACT_JOIN: |
| 55 | 55 | yield $this->linking( |
@@ -28,7 +28,7 @@ discard block |
||
| 28 | 28 | protected function lookupEndpoint(Lookupd $lookupd, string $topic) |
| 29 | 29 | { |
| 30 | 30 | if (empty($this->cachedEndpoints)) { |
| 31 | - $this->cachedEndpoints = yield $lookupd->endpoints($topic, function (array $endpoints) { |
|
| 31 | + $this->cachedEndpoints = yield $lookupd->endpoints($topic, function(array $endpoints) { |
|
| 32 | 32 | $this->cachedEndpoints = $endpoints; |
| 33 | 33 | }); |
| 34 | 34 | } |
@@ -45,7 +45,7 @@ discard block |
||
| 45 | 45 | */ |
| 46 | 46 | protected function syncedEndpoints(string $topic, Closure $linker, Lookupd $lookupd = null, array $statics = []) |
| 47 | 47 | { |
| 48 | - return $lookupd ? $lookupd->endpoints($topic, function (array $endpoints) use ($linker) { |
|
| 48 | + return $lookupd ? $lookupd->endpoints($topic, function(array $endpoints) use ($linker) { |
|
| 49 | 49 | return $this->routedEndpoints($linker, ...$endpoints); |
| 50 | 50 | }) : $this->routedEndpoints($linker, ...$statics); |
| 51 | 51 | } |
@@ -94,7 +94,7 @@ discard block |
||
| 94 | 94 | $marked = []; |
| 95 | 95 | |
| 96 | 96 | foreach ($endpoints as $endpoint) { |
| 97 | - $marked[(string)$endpoint->address()] = $endpoint; |
|
| 97 | + $marked[(string) $endpoint->address()] = $endpoint; |
|
| 98 | 98 | } |
| 99 | 99 | |
| 100 | 100 | return $marked; |
@@ -53,7 +53,7 @@ |
||
| 53 | 53 | { |
| 54 | 54 | $waits = []; |
| 55 | 55 | |
| 56 | - $this->linkers(function (Linker $linker) use (&$waits) { |
|
| 56 | + $this->linkers(function(Linker $linker) use (&$waits) { |
|
| 57 | 57 | array_push($waits, $linker->disconnect()); |
| 58 | 58 | }); |
| 59 | 59 | |
@@ -29,8 +29,8 @@ discard block |
||
| 29 | 29 | protected function linking(Endpoint $endpoint, Consuming $consuming = null) : Linker |
| 30 | 30 | { |
| 31 | 31 | return |
| 32 | - $this->linked[(string)$endpoint->address()] ?? |
|
| 33 | - $this->linked[(string)$endpoint->address()] = new Linker($endpoint, $consuming); |
|
| 32 | + $this->linked[(string) $endpoint->address()] ?? |
|
| 33 | + $this->linked[(string) $endpoint->address()] = new Linker($endpoint, $consuming); |
|
| 34 | 34 | } |
| 35 | 35 | |
| 36 | 36 | /** |
@@ -39,7 +39,7 @@ discard block |
||
| 39 | 39 | */ |
| 40 | 40 | protected function linker(Endpoint $endpoint) : Linker |
| 41 | 41 | { |
| 42 | - return $this->linked[(string)$endpoint->address()]; |
|
| 42 | + return $this->linked[(string) $endpoint->address()]; |
|
| 43 | 43 | } |
| 44 | 44 | |
| 45 | 45 | /** |