@@ -26,7 +26,7 @@ discard block |
||
| 26 | 26 | public function queueCommand(string $queueName, string $id, string $serialized) |
| 27 | 27 | { |
| 28 | 28 | if (!$this->storeCommandAndCheckReservationStatus($queueName, $id, $serialized)) { |
| 29 | - $this->client->pipeline(function (ClientContextInterface $client) use ($queueName, $id) { |
|
| 29 | + $this->client->pipeline(function(ClientContextInterface $client) use ($queueName, $id) { |
|
| 30 | 30 | self::cReserveCommandId($client, $queueName, $id); |
| 31 | 31 | self::cUpdateCommandStatus($client, $queueName, $id, self::STATUS_QUEUED); |
| 32 | 32 | $client->lpush(":{$queueName}:queue", [ $id ]); |
@@ -53,7 +53,7 @@ discard block |
||
| 53 | 53 | return $this->awaitCommand($queueName, $timeout); |
| 54 | 54 | } |
| 55 | 55 | /* @var $id string */ |
| 56 | - list(, $serialized) = $this->client->pipeline(function (ClientContextInterface $client) use ($queueName, $id) { |
|
| 56 | + list(, $serialized) = $this->client->pipeline(function(ClientContextInterface $client) use ($queueName, $id) { |
|
| 57 | 57 | self::cUpdateCommandStatus($client, $queueName, $id, self::STATUS_IN_PROGRESS); |
| 58 | 58 | self::cRetrieveCommand($client, $queueName, $id); |
| 59 | 59 | self::cReleaseReservedCommandIds($client, $queueName, [ $id ]); |
@@ -68,21 +68,21 @@ discard block |
||
| 68 | 68 | |
| 69 | 69 | public function setCommandCompleted(string $queueName, string $id) |
| 70 | 70 | { |
| 71 | - $this->client->pipeline(function (ClientContextInterface $client) use ($queueName, $id) { |
|
| 71 | + $this->client->pipeline(function(ClientContextInterface $client) use ($queueName, $id) { |
|
| 72 | 72 | self::cEndCommand($client, $queueName, $id, self::STATUS_COMPLETED); |
| 73 | 73 | }); |
| 74 | 74 | } |
| 75 | 75 | |
| 76 | 76 | public function setCommandFailed(string $queueName, string $id) |
| 77 | 77 | { |
| 78 | - $this->client->pipeline(function (ClientContextInterface $client) use ($queueName, $id) { |
|
| 78 | + $this->client->pipeline(function(ClientContextInterface $client) use ($queueName, $id) { |
|
| 79 | 79 | self::cEndCommand($client, $queueName, $id, self::STATUS_FAILED); |
| 80 | 80 | }); |
| 81 | 81 | } |
| 82 | 82 | |
| 83 | 83 | public function putQueue(string $queueName) |
| 84 | 84 | { |
| 85 | - $this->client->pipeline(function (ClientContextInterface $client) use ($queueName) { |
|
| 85 | + $this->client->pipeline(function(ClientContextInterface $client) use ($queueName) { |
|
| 86 | 86 | self::cAddQueue($client, $queueName); |
| 87 | 87 | }); |
| 88 | 88 | } |
@@ -128,14 +128,14 @@ discard block |
||
| 128 | 128 | |
| 129 | 129 | public function deleteQueue(string $queueName) |
| 130 | 130 | { |
| 131 | - $this->client->pipeline(function (ClientContextInterface $client) use ($queueName) { |
|
| 131 | + $this->client->pipeline(function(ClientContextInterface $client) use ($queueName) { |
|
| 132 | 132 | self::cDeleteQueue($client, $queueName); |
| 133 | 133 | }); |
| 134 | 134 | } |
| 135 | 135 | |
| 136 | 136 | public function purgeCommand(string $queueName, string $id) |
| 137 | 137 | { |
| 138 | - $this->client->pipeline(function (ClientContextInterface $client) use ($queueName, $id) { |
|
| 138 | + $this->client->pipeline(function(ClientContextInterface $client) use ($queueName, $id) { |
|
| 139 | 139 | $client->hdel(":{$queueName}:command_store", [ $id ]); |
| 140 | 140 | $client->hdel(":{$queueName}:command_status", [ $id ]); |
| 141 | 141 | self::cReleaseReservedCommandIds($client, $queueName, [ $id ]); |
@@ -150,7 +150,7 @@ discard block |
||
| 150 | 150 | { |
| 151 | 151 | if (!$this->storeCommandAndCheckReservationStatus($queueName, $id, $serialized)) { |
| 152 | 152 | $this->client->pipeline( |
| 153 | - function (ClientContextInterface $client) use ($queueName, $id, $dateTime) { |
|
| 153 | + function(ClientContextInterface $client) use ($queueName, $id, $dateTime) { |
|
| 154 | 154 | self::cReserveCommandId($client, $queueName, $id); |
| 155 | 155 | self::cUpdateCommandStatus($client, $queueName, $id, self::STATUS_SCHEDULED); |
| 156 | 156 | $json = json_encode([ $queueName, $id ]); |
@@ -175,7 +175,7 @@ discard block |
||
| 175 | 175 | foreach ($queueNames as $queueName) { |
| 176 | 176 | $result = $this->client->zrangebyscore(':schedule', $lowScore, $highScore); |
| 177 | 177 | if (!empty($result)) { |
| 178 | - $this->client->pipeline(function (ClientContextInterface $client) use ($result, $queueName) { |
|
| 178 | + $this->client->pipeline(function(ClientContextInterface $client) use ($result, $queueName) { |
|
| 179 | 179 | $idsToRelease = [ ]; |
| 180 | 180 | foreach ($result as $json => $score) { |
| 181 | 181 | list($thisQueueName, $id) = json_decode($json, true); |
@@ -208,7 +208,7 @@ discard block |
||
| 208 | 208 | if ($result !== [ ]) { |
| 209 | 209 | $queueNamesById = $idsByJson = [ ]; |
| 210 | 210 | $pipelineReturn = $this->client->pipeline( |
| 211 | - function (ClientContextInterface $client) use ($result, &$queueNamesById, &$idsByJson) { |
|
| 211 | + function(ClientContextInterface $client) use ($result, &$queueNamesById, &$idsByJson) { |
|
| 212 | 212 | $idsByQueueName = [ ]; |
| 213 | 213 | foreach ($result as $json => $score) { |
| 214 | 214 | list($queueName, $id) = json_decode($json, true); |
@@ -239,7 +239,7 @@ discard block |
||
| 239 | 239 | private function storeCommandAndCheckReservationStatus(string $queueName, string $id, string $serialized): bool |
| 240 | 240 | { |
| 241 | 241 | list ($isReserved) = $this->client->pipeline( |
| 242 | - function (ClientContextInterface $client) use ($queueName, $id, $serialized) { |
|
| 242 | + function(ClientContextInterface $client) use ($queueName, $id, $serialized) { |
|
| 243 | 243 | self::cIsCommandIdReserved($client, $queueName, $id); |
| 244 | 244 | self::cStoreCommand($client, $queueName, $id, $serialized); |
| 245 | 245 | } |
@@ -11,22 +11,22 @@ discard block |
||
| 11 | 11 | /** |
| 12 | 12 | * @var QueueVoterInterface[] |
| 13 | 13 | */ |
| 14 | - private $voters = []; |
|
| 14 | + private $voters = [ ]; |
|
| 15 | 15 | |
| 16 | 16 | public function __construct(array $voters) |
| 17 | 17 | { |
| 18 | - array_walk($voters, function (QueueVoterInterface $voter) { |
|
| 19 | - $this->voters[] = $voter; |
|
| 18 | + array_walk($voters, function(QueueVoterInterface $voter) { |
|
| 19 | + $this->voters[ ] = $voter; |
|
| 20 | 20 | }); |
| 21 | 21 | } |
| 22 | 22 | |
| 23 | 23 | public function resolveQueueName($command): string |
| 24 | 24 | { |
| 25 | 25 | $votes = $this->getVotes($command); |
| 26 | - if ($votes === []) { |
|
| 26 | + if ($votes === [ ]) { |
|
| 27 | 27 | throw new QueueResolverException; |
| 28 | 28 | } |
| 29 | - return $votes[0]->getQueueName(); |
|
| 29 | + return $votes[ 0 ]->getQueueName(); |
|
| 30 | 30 | } |
| 31 | 31 | |
| 32 | 32 | /** |
@@ -35,14 +35,14 @@ discard block |
||
| 35 | 35 | */ |
| 36 | 36 | public function getVotes($command): array |
| 37 | 37 | { |
| 38 | - $votes = []; |
|
| 38 | + $votes = [ ]; |
|
| 39 | 39 | foreach ($this->voters as $voter) { |
| 40 | 40 | $vote = $voter->getVote($command); |
| 41 | 41 | if ($vote instanceof QueueVote) { |
| 42 | - $votes[] = $vote; |
|
| 42 | + $votes[ ] = $vote; |
|
| 43 | 43 | } |
| 44 | 44 | } |
| 45 | - usort($votes, function (QueueVote $voteA, QueueVote $voteB) { |
|
| 45 | + usort($votes, function(QueueVote $voteA, QueueVote $voteB) { |
|
| 46 | 46 | return $voteB->getConfidence() <=> $voteA->getConfidence(); |
| 47 | 47 | }); |
| 48 | 48 | return $votes; |
@@ -15,10 +15,10 @@ |
||
| 15 | 15 | |
| 16 | 16 | public function getVotes($command): array |
| 17 | 17 | { |
| 18 | - $votes = []; |
|
| 18 | + $votes = [ ]; |
|
| 19 | 19 | foreach (parent::getVotes($command) as $vote) { |
| 20 | 20 | if (in_array($vote->getQueueName(), $this->whitelist, true)) { |
| 21 | - $votes[] = $vote; |
|
| 21 | + $votes[ ] = $vote; |
|
| 22 | 22 | } |
| 23 | 23 | } |
| 24 | 24 | return $votes; |