@@ -26,7 +26,7 @@ discard block  | 
                                                    ||
| 26 | 26 | public function queueCommand(string $queueName, string $id, string $serialized)  | 
                                                        
| 27 | 27 |      { | 
                                                        
| 28 | 28 |          if (!$this->storeCommandAndCheckReservationStatus($queueName, $id, $serialized)) { | 
                                                        
| 29 | -            $this->client->pipeline(function (ClientContextInterface $client) use ($queueName, $id) { | 
                                                        |
| 29 | +            $this->client->pipeline(function(ClientContextInterface $client) use ($queueName, $id) { | 
                                                        |
| 30 | 30 | self::cReserveCommandId($client, $queueName, $id);  | 
                                                        
| 31 | 31 | self::cUpdateCommandStatus($client, $queueName, $id, self::STATUS_QUEUED);  | 
                                                        
| 32 | 32 |                  $client->lpush(":{$queueName}:queue", [ $id ]); | 
                                                        
@@ -53,7 +53,7 @@ discard block  | 
                                                    ||
| 53 | 53 | return $this->awaitCommand($queueName, $timeout);  | 
                                                        
| 54 | 54 | }  | 
                                                        
| 55 | 55 | /* @var $id string */  | 
                                                        
| 56 | -        list(, $serialized) = $this->client->pipeline(function (ClientContextInterface $client) use ($queueName, $id) { | 
                                                        |
| 56 | +        list(, $serialized) = $this->client->pipeline(function(ClientContextInterface $client) use ($queueName, $id) { | 
                                                        |
| 57 | 57 | self::cUpdateCommandStatus($client, $queueName, $id, self::STATUS_IN_PROGRESS);  | 
                                                        
| 58 | 58 | self::cRetrieveCommand($client, $queueName, $id);  | 
                                                        
| 59 | 59 | self::cReleaseReservedCommandIds($client, $queueName, [ $id ]);  | 
                                                        
@@ -68,21 +68,21 @@ discard block  | 
                                                    ||
| 68 | 68 | |
| 69 | 69 | public function setCommandCompleted(string $queueName, string $id)  | 
                                                        
| 70 | 70 |      { | 
                                                        
| 71 | -        $this->client->pipeline(function (ClientContextInterface $client) use ($queueName, $id) { | 
                                                        |
| 71 | +        $this->client->pipeline(function(ClientContextInterface $client) use ($queueName, $id) { | 
                                                        |
| 72 | 72 | self::cEndCommand($client, $queueName, $id, self::STATUS_COMPLETED);  | 
                                                        
| 73 | 73 | });  | 
                                                        
| 74 | 74 | }  | 
                                                        
| 75 | 75 | |
| 76 | 76 | public function setCommandFailed(string $queueName, string $id)  | 
                                                        
| 77 | 77 |      { | 
                                                        
| 78 | -        $this->client->pipeline(function (ClientContextInterface $client) use ($queueName, $id) { | 
                                                        |
| 78 | +        $this->client->pipeline(function(ClientContextInterface $client) use ($queueName, $id) { | 
                                                        |
| 79 | 79 | self::cEndCommand($client, $queueName, $id, self::STATUS_FAILED);  | 
                                                        
| 80 | 80 | });  | 
                                                        
| 81 | 81 | }  | 
                                                        
| 82 | 82 | |
| 83 | 83 | public function putQueue(string $queueName)  | 
                                                        
| 84 | 84 |      { | 
                                                        
| 85 | -        $this->client->pipeline(function (ClientContextInterface $client) use ($queueName) { | 
                                                        |
| 85 | +        $this->client->pipeline(function(ClientContextInterface $client) use ($queueName) { | 
                                                        |
| 86 | 86 | self::cAddQueue($client, $queueName);  | 
                                                        
| 87 | 87 | });  | 
                                                        
| 88 | 88 | }  | 
                                                        
@@ -128,14 +128,14 @@ discard block  | 
                                                    ||
| 128 | 128 | |
| 129 | 129 | public function deleteQueue(string $queueName)  | 
                                                        
| 130 | 130 |      { | 
                                                        
| 131 | -        $this->client->pipeline(function (ClientContextInterface $client) use ($queueName) { | 
                                                        |
| 131 | +        $this->client->pipeline(function(ClientContextInterface $client) use ($queueName) { | 
                                                        |
| 132 | 132 | self::cDeleteQueue($client, $queueName);  | 
                                                        
| 133 | 133 | });  | 
                                                        
| 134 | 134 | }  | 
                                                        
| 135 | 135 | |
| 136 | 136 | public function purgeCommand(string $queueName, string $id)  | 
                                                        
| 137 | 137 |      { | 
                                                        
| 138 | -        $this->client->pipeline(function (ClientContextInterface $client) use ($queueName, $id) { | 
                                                        |
| 138 | +        $this->client->pipeline(function(ClientContextInterface $client) use ($queueName, $id) { | 
                                                        |
| 139 | 139 |              $client->hdel(":{$queueName}:command_store", [ $id ]); | 
                                                        
| 140 | 140 |              $client->hdel(":{$queueName}:command_status", [ $id ]); | 
                                                        
| 141 | 141 | self::cReleaseReservedCommandIds($client, $queueName, [ $id ]);  | 
                                                        
@@ -150,7 +150,7 @@ discard block  | 
                                                    ||
| 150 | 150 |      { | 
                                                        
| 151 | 151 |          if (!$this->storeCommandAndCheckReservationStatus($queueName, $id, $serialized)) { | 
                                                        
| 152 | 152 | $this->client->pipeline(  | 
                                                        
| 153 | -                function (ClientContextInterface $client) use ($queueName, $id, $dateTime) { | 
                                                        |
| 153 | +                function(ClientContextInterface $client) use ($queueName, $id, $dateTime) { | 
                                                        |
| 154 | 154 | self::cReserveCommandId($client, $queueName, $id);  | 
                                                        
| 155 | 155 | self::cUpdateCommandStatus($client, $queueName, $id, self::STATUS_SCHEDULED);  | 
                                                        
| 156 | 156 | $json = json_encode([ $queueName, $id ]);  | 
                                                        
@@ -175,7 +175,7 @@ discard block  | 
                                                    ||
| 175 | 175 |          foreach ($queueNames as $queueName) { | 
                                                        
| 176 | 176 |              $result = $this->client->zrangebyscore(':schedule', $lowScore, $highScore); | 
                                                        
| 177 | 177 |              if (!empty($result)) { | 
                                                        
| 178 | -                $this->client->pipeline(function (ClientContextInterface $client) use ($result, $queueName) { | 
                                                        |
| 178 | +                $this->client->pipeline(function(ClientContextInterface $client) use ($result, $queueName) { | 
                                                        |
| 179 | 179 | $idsToRelease = [ ];  | 
                                                        
| 180 | 180 |                      foreach ($result as $json => $score) { | 
                                                        
| 181 | 181 | list($thisQueueName, $id) = json_decode($json, true);  | 
                                                        
@@ -208,7 +208,7 @@ discard block  | 
                                                    ||
| 208 | 208 |          if ($result !== [ ]) { | 
                                                        
| 209 | 209 | $queueNamesById = $idsByJson = [ ];  | 
                                                        
| 210 | 210 | $pipelineReturn = $this->client->pipeline(  | 
                                                        
| 211 | -                function (ClientContextInterface $client) use ($result, &$queueNamesById, &$idsByJson) { | 
                                                        |
| 211 | +                function(ClientContextInterface $client) use ($result, &$queueNamesById, &$idsByJson) { | 
                                                        |
| 212 | 212 | $idsByQueueName = [ ];  | 
                                                        
| 213 | 213 |                      foreach ($result as $json => $score) { | 
                                                        
| 214 | 214 | list($queueName, $id) = json_decode($json, true);  | 
                                                        
@@ -239,7 +239,7 @@ discard block  | 
                                                    ||
| 239 | 239 | private function storeCommandAndCheckReservationStatus(string $queueName, string $id, string $serialized): bool  | 
                                                        
| 240 | 240 |      { | 
                                                        
| 241 | 241 | list ($isReserved) = $this->client->pipeline(  | 
                                                        
| 242 | -            function (ClientContextInterface $client) use ($queueName, $id, $serialized) { | 
                                                        |
| 242 | +            function(ClientContextInterface $client) use ($queueName, $id, $serialized) { | 
                                                        |
| 243 | 243 | self::cIsCommandIdReserved($client, $queueName, $id);  | 
                                                        
| 244 | 244 | self::cStoreCommand($client, $queueName, $id, $serialized);  | 
                                                        
| 245 | 245 | }  |