| @@ -26,10 +26,10 @@ discard block | ||
| 26 | 26 | public function queueCommand(string $queueName, string $id, string $serialized) | 
| 27 | 27 |      { | 
| 28 | 28 |          if (!$this->storeCommandAndCheckReservationStatus($queueName, $id, $serialized)) { | 
| 29 | -            $this->client->pipeline(function (ClientContextInterface $client) use ($queueName, $id) { | |
| 29 | +            $this->client->pipeline(function(ClientContextInterface $client) use ($queueName, $id) { | |
| 30 | 30 | self::cReserveCommandId($client, $queueName, $id); | 
| 31 | 31 | self::cUpdateCommandStatus($client, $queueName, $id, self::STATUS_QUEUED); | 
| 32 | -                $client->lpush(":{$queueName}:queue", [$id]); | |
| 32 | +                $client->lpush(":{$queueName}:queue", [ $id ]); | |
| 33 | 33 | }); | 
| 34 | 34 | } | 
| 35 | 35 | } | 
| @@ -53,10 +53,10 @@ discard block | ||
| 53 | 53 | return $this->awaitCommand($queueName, $timeout); | 
| 54 | 54 | } | 
| 55 | 55 | /* @var $id string */ | 
| 56 | -        list(, $serialized) = $this->client->pipeline(function (ClientContextInterface $client) use ($queueName, $id) { | |
| 56 | +        list(, $serialized) = $this->client->pipeline(function(ClientContextInterface $client) use ($queueName, $id) { | |
| 57 | 57 | self::cUpdateCommandStatus($client, $queueName, $id, self::STATUS_IN_PROGRESS); | 
| 58 | 58 | self::cRetrieveCommand($client, $queueName, $id); | 
| 59 | - self::cReleaseReservedCommandIds($client, $queueName, [$id]); | |
| 59 | + self::cReleaseReservedCommandIds($client, $queueName, [ $id ]); | |
| 60 | 60 | }); | 
| 61 | 61 | return new ReceivedCommand($queueName, $id, $serialized); | 
| 62 | 62 | } | 
| @@ -68,21 +68,21 @@ discard block | ||
| 68 | 68 | |
| 69 | 69 | public function setCommandCompleted(string $queueName, string $id) | 
| 70 | 70 |      { | 
| 71 | -        $this->client->pipeline(function (ClientContextInterface $client) use ($queueName, $id) { | |
| 71 | +        $this->client->pipeline(function(ClientContextInterface $client) use ($queueName, $id) { | |
| 72 | 72 | self::cEndCommand($client, $queueName, $id, self::STATUS_COMPLETED); | 
| 73 | 73 | }); | 
| 74 | 74 | } | 
| 75 | 75 | |
| 76 | 76 | public function setCommandFailed(string $queueName, string $id) | 
| 77 | 77 |      { | 
| 78 | -        $this->client->pipeline(function (ClientContextInterface $client) use ($queueName, $id) { | |
| 78 | +        $this->client->pipeline(function(ClientContextInterface $client) use ($queueName, $id) { | |
| 79 | 79 | self::cEndCommand($client, $queueName, $id, self::STATUS_FAILED); | 
| 80 | 80 | }); | 
| 81 | 81 | } | 
| 82 | 82 | |
| 83 | 83 | public function putQueue(string $queueName) | 
| 84 | 84 |      { | 
| 85 | -        $this->client->pipeline(function (ClientContextInterface $client) use ($queueName) { | |
| 85 | +        $this->client->pipeline(function(ClientContextInterface $client) use ($queueName) { | |
| 86 | 86 | self::cAddQueue($client, $queueName); | 
| 87 | 87 | }); | 
| 88 | 88 | } | 
| @@ -128,18 +128,18 @@ discard block | ||
| 128 | 128 | |
| 129 | 129 | public function deleteQueue(string $queueName) | 
| 130 | 130 |      { | 
| 131 | -        $this->client->pipeline(function (ClientContextInterface $client) use ($queueName) { | |
| 131 | +        $this->client->pipeline(function(ClientContextInterface $client) use ($queueName) { | |
| 132 | 132 | self::cDeleteQueue($client, $queueName); | 
| 133 | 133 | }); | 
| 134 | 134 | } | 
| 135 | 135 | |
| 136 | 136 | public function purgeCommand(string $queueName, string $id) | 
| 137 | 137 |      { | 
| 138 | -        $this->client->pipeline(function (ClientContextInterface $client) use ($queueName, $id) { | |
| 139 | -            $client->hdel(":{$queueName}:command_store", [$id]); | |
| 140 | -            $client->hdel(":{$queueName}:command_status", [$id]); | |
| 141 | - self::cReleaseReservedCommandIds($client, $queueName, [$id]); | |
| 142 | - $json = json_encode([$queueName, $id]); | |
| 138 | +        $this->client->pipeline(function(ClientContextInterface $client) use ($queueName, $id) { | |
| 139 | +            $client->hdel(":{$queueName}:command_store", [ $id ]); | |
| 140 | +            $client->hdel(":{$queueName}:command_status", [ $id ]); | |
| 141 | + self::cReleaseReservedCommandIds($client, $queueName, [ $id ]); | |
| 142 | + $json = json_encode([ $queueName, $id ]); | |
| 143 | 143 |              $client->lrem(":{$queueName}:queue", 1, $id); | 
| 144 | 144 |              $client->lrem(":{$queueName}:consuming", 1, $id); | 
| 145 | 145 |              $client->zrem(':schedule', $json); | 
| @@ -150,11 +150,11 @@ discard block | ||
| 150 | 150 |      { | 
| 151 | 151 |          if (!$this->storeCommandAndCheckReservationStatus($queueName, $id, $serialized)) { | 
| 152 | 152 | $this->client->pipeline( | 
| 153 | -                function (ClientContextInterface $client) use ($queueName, $id, $dateTime) { | |
| 153 | +                function(ClientContextInterface $client) use ($queueName, $id, $dateTime) { | |
| 154 | 154 | self::cReserveCommandId($client, $queueName, $id); | 
| 155 | 155 | self::cUpdateCommandStatus($client, $queueName, $id, self::STATUS_SCHEDULED); | 
| 156 | - $json = json_encode([$queueName, $id]); | |
| 157 | -                    $client->zadd(':schedule', [$json => $dateTime->getTimestamp()]); | |
| 156 | + $json = json_encode([ $queueName, $id ]); | |
| 157 | +                    $client->zadd(':schedule', [ $json => $dateTime->getTimestamp() ]); | |
| 158 | 158 | } | 
| 159 | 159 | ); | 
| 160 | 160 | } | 
| @@ -175,13 +175,13 @@ discard block | ||
| 175 | 175 |          foreach ($queueNames as $queueName) { | 
| 176 | 176 |              $result = $this->client->zrangebyscore(':schedule', $lowScore, $highScore); | 
| 177 | 177 |              if (!empty($result)) { | 
| 178 | -                $this->client->pipeline(function (ClientContextInterface $client) use ($result, $queueName) { | |
| 179 | - $idsToRelease = []; | |
| 178 | +                $this->client->pipeline(function(ClientContextInterface $client) use ($result, $queueName) { | |
| 179 | + $idsToRelease = [ ]; | |
| 180 | 180 |                      foreach ($result as $json => $score) { | 
| 181 | 181 | list($thisQueueName, $id) = json_decode($json, true); | 
| 182 | 182 |                          if ($thisQueueName === $queueName) { | 
| 183 | 183 |                              $client->zrem(':schedule', $json); | 
| 184 | - $idsToRelease[] = $id; | |
| 184 | + $idsToRelease[ ] = $id; | |
| 185 | 185 | } | 
| 186 | 186 | } | 
| 187 | 187 | self::cReleaseReservedCommandIds($client, $queueName, $idsToRelease); | 
| @@ -201,21 +201,21 @@ discard block | ||
| 201 | 201 | $start = $startTime->getTimestamp(); | 
| 202 | 202 | } | 
| 203 | 203 |          $result = $this->client->zrangebyscore(':schedule', $start, $now->getTimestamp(), [ | 
| 204 | - 'limit' => [0, $limit], | |
| 204 | + 'limit' => [ 0, $limit ], | |
| 205 | 205 | 'withscores' => true, | 
| 206 | 206 | ]); | 
| 207 | - $commands = []; | |
| 208 | -        if ($result !== []) { | |
| 209 | - $queueNamesById = $idsByJson = []; | |
| 207 | + $commands = [ ]; | |
| 208 | +        if ($result !== [ ]) { | |
| 209 | + $queueNamesById = $idsByJson = [ ]; | |
| 210 | 210 | $pipelineReturn = $this->client->pipeline( | 
| 211 | -                function (ClientContextInterface $client) use ($result, &$queueNamesById, &$idsByJson) { | |
| 212 | - $idsByQueueName = []; | |
| 211 | +                function(ClientContextInterface $client) use ($result, &$queueNamesById, &$idsByJson) { | |
| 212 | + $idsByQueueName = [ ]; | |
| 213 | 213 |                      foreach ($result as $json => $score) { | 
| 214 | 214 | list($queueName, $id) = json_decode($json, true); | 
| 215 | 215 | self::cRetrieveCommand($client, $queueName, $id); | 
| 216 | - $idsByQueueName[$queueName][] = $id; | |
| 217 | - $queueNamesById[$id] = $queueName; | |
| 218 | - $idsByJson[$json] = $id; | |
| 216 | + $idsByQueueName[ $queueName ][ ] = $id; | |
| 217 | + $queueNamesById[ $id ] = $queueName; | |
| 218 | + $idsByJson[ $json ] = $id; | |
| 219 | 219 | } | 
| 220 | 220 |                      $client->zrem(':schedule', array_keys($result)); | 
| 221 | 221 |                      foreach ($idsByQueueName as $queueName => $ids) { | 
| @@ -224,12 +224,12 @@ discard block | ||
| 224 | 224 | } | 
| 225 | 225 | ); | 
| 226 | 226 |              foreach (array_keys($result) as $index => $json) { | 
| 227 | - $id = $idsByJson[$json]; | |
| 228 | - $commands[] = new ReceivedScheduledCommand( | |
| 229 | - $queueNamesById[$id], | |
| 227 | + $id = $idsByJson[ $json ]; | |
| 228 | + $commands[ ] = new ReceivedScheduledCommand( | |
| 229 | + $queueNamesById[ $id ], | |
| 230 | 230 | $id, | 
| 231 | - $pipelineReturn[$index], | |
| 232 | -                    new \DateTime('@' . $result[$json]) | |
| 231 | + $pipelineReturn[ $index ], | |
| 232 | +                    new \DateTime('@'.$result[ $json ]) | |
| 233 | 233 | ); | 
| 234 | 234 | } | 
| 235 | 235 | } | 
| @@ -239,7 +239,7 @@ discard block | ||
| 239 | 239 | private function storeCommandAndCheckReservationStatus(string $queueName, string $id, string $serialized): bool | 
| 240 | 240 |      { | 
| 241 | 241 | list ($isReserved) = $this->client->pipeline( | 
| 242 | -            function (ClientContextInterface $client) use ($queueName, $id, $serialized) { | |
| 242 | +            function(ClientContextInterface $client) use ($queueName, $id, $serialized) { | |
| 243 | 243 | self::cIsCommandIdReserved($client, $queueName, $id); | 
| 244 | 244 | self::cStoreCommand($client, $queueName, $id, $serialized); | 
| 245 | 245 | } | 
| @@ -260,14 +260,14 @@ discard block | ||
| 260 | 260 | |
| 261 | 261 | private static function cReserveCommandId($client, string $queueName, string $id) | 
| 262 | 262 |      { | 
| 263 | -        $client->sadd(":{$queueName}:queue_ids", [$id]); | |
| 263 | +        $client->sadd(":{$queueName}:queue_ids", [ $id ]); | |
| 264 | 264 | } | 
| 265 | 265 | |
| 266 | 266 | private static function cEndCommand($client, string $queueName, string $id, string $status) | 
| 267 | 267 |      { | 
| 268 | 268 | self::cUpdateCommandStatus($client, $queueName, $id, $status); | 
| 269 | - self::cReleaseReservedCommandIds($client, $queueName, [$id]); | |
| 270 | -        $client->srem(":{$queueName}:queue_ids", [$id]); | |
| 269 | + self::cReleaseReservedCommandIds($client, $queueName, [ $id ]); | |
| 270 | +        $client->srem(":{$queueName}:queue_ids", [ $id ]); | |
| 271 | 271 |          $client->lrem(":{$queueName}:consuming", 1, $id); | 
| 272 | 272 | } | 
| 273 | 273 | |
| @@ -288,7 +288,7 @@ discard block | ||
| 288 | 288 | |
| 289 | 289 | private static function cAddQueue($client, string $queueName) | 
| 290 | 290 |      { | 
| 291 | -        $client->sadd(':queues', [$queueName]); | |
| 291 | +        $client->sadd(':queues', [ $queueName ]); | |
| 292 | 292 | } | 
| 293 | 293 | |
| 294 | 294 | private static function cEmptyQueue($client, string $queueName) |