@@ -26,10 +26,10 @@ discard block |
||
26 | 26 | public function queueCommand(string $queueName, string $id, string $serialized) |
27 | 27 | { |
28 | 28 | if (!$this->storeCommandAndCheckReservationStatus($queueName, $id, $serialized)) { |
29 | - $this->client->pipeline(function (ClientContextInterface $client) use ($queueName, $id) { |
|
29 | + $this->client->pipeline(function(ClientContextInterface $client) use ($queueName, $id) { |
|
30 | 30 | self::cReserveCommandId($client, $queueName, $id); |
31 | 31 | self::cUpdateCommandStatus($client, $queueName, $id, self::STATUS_QUEUED); |
32 | - $client->lpush(":{$queueName}:queue", [$id]); |
|
32 | + $client->lpush(":{$queueName}:queue", [ $id ]); |
|
33 | 33 | }); |
34 | 34 | } |
35 | 35 | } |
@@ -53,10 +53,10 @@ discard block |
||
53 | 53 | return $this->awaitCommand($queueName, $timeout); |
54 | 54 | } |
55 | 55 | /* @var $id string */ |
56 | - list(, $serialized) = $this->client->pipeline(function (ClientContextInterface $client) use ($queueName, $id) { |
|
56 | + list(, $serialized) = $this->client->pipeline(function(ClientContextInterface $client) use ($queueName, $id) { |
|
57 | 57 | self::cUpdateCommandStatus($client, $queueName, $id, self::STATUS_IN_PROGRESS); |
58 | 58 | self::cRetrieveCommand($client, $queueName, $id); |
59 | - self::cReleaseReservedCommandIds($client, $queueName, [$id]); |
|
59 | + self::cReleaseReservedCommandIds($client, $queueName, [ $id ]); |
|
60 | 60 | }); |
61 | 61 | return new ReceivedCommand($queueName, $id, $serialized); |
62 | 62 | } |
@@ -68,21 +68,21 @@ discard block |
||
68 | 68 | |
69 | 69 | public function setCommandCompleted(string $queueName, string $id) |
70 | 70 | { |
71 | - $this->client->pipeline(function (ClientContextInterface $client) use ($queueName, $id) { |
|
71 | + $this->client->pipeline(function(ClientContextInterface $client) use ($queueName, $id) { |
|
72 | 72 | self::cEndCommand($client, $queueName, $id, self::STATUS_COMPLETED); |
73 | 73 | }); |
74 | 74 | } |
75 | 75 | |
76 | 76 | public function setCommandFailed(string $queueName, string $id) |
77 | 77 | { |
78 | - $this->client->pipeline(function (ClientContextInterface $client) use ($queueName, $id) { |
|
78 | + $this->client->pipeline(function(ClientContextInterface $client) use ($queueName, $id) { |
|
79 | 79 | self::cEndCommand($client, $queueName, $id, self::STATUS_FAILED); |
80 | 80 | }); |
81 | 81 | } |
82 | 82 | |
83 | 83 | public function putQueue(string $queueName) |
84 | 84 | { |
85 | - $this->client->pipeline(function (ClientContextInterface $client) use ($queueName) { |
|
85 | + $this->client->pipeline(function(ClientContextInterface $client) use ($queueName) { |
|
86 | 86 | self::cAddQueue($client, $queueName); |
87 | 87 | }); |
88 | 88 | } |
@@ -128,18 +128,18 @@ discard block |
||
128 | 128 | |
129 | 129 | public function deleteQueue(string $queueName) |
130 | 130 | { |
131 | - $this->client->pipeline(function (ClientContextInterface $client) use ($queueName) { |
|
131 | + $this->client->pipeline(function(ClientContextInterface $client) use ($queueName) { |
|
132 | 132 | self::cDeleteQueue($client, $queueName); |
133 | 133 | }); |
134 | 134 | } |
135 | 135 | |
136 | 136 | public function purgeCommand(string $queueName, string $id) |
137 | 137 | { |
138 | - $this->client->pipeline(function (ClientContextInterface $client) use ($queueName, $id) { |
|
139 | - $client->hdel(":{$queueName}:command_store", [$id]); |
|
140 | - $client->hdel(":{$queueName}:command_status", [$id]); |
|
141 | - self::cReleaseReservedCommandIds($client, $queueName, [$id]); |
|
142 | - $json = json_encode([$queueName, $id]); |
|
138 | + $this->client->pipeline(function(ClientContextInterface $client) use ($queueName, $id) { |
|
139 | + $client->hdel(":{$queueName}:command_store", [ $id ]); |
|
140 | + $client->hdel(":{$queueName}:command_status", [ $id ]); |
|
141 | + self::cReleaseReservedCommandIds($client, $queueName, [ $id ]); |
|
142 | + $json = json_encode([ $queueName, $id ]); |
|
143 | 143 | $client->lrem(":{$queueName}:queue", 1, $id); |
144 | 144 | $client->lrem(":{$queueName}:consuming", 1, $id); |
145 | 145 | $client->zrem(':schedule', $json); |
@@ -150,11 +150,11 @@ discard block |
||
150 | 150 | { |
151 | 151 | if (!$this->storeCommandAndCheckReservationStatus($queueName, $id, $serialized)) { |
152 | 152 | $this->client->pipeline( |
153 | - function (ClientContextInterface $client) use ($queueName, $id, $dateTime) { |
|
153 | + function(ClientContextInterface $client) use ($queueName, $id, $dateTime) { |
|
154 | 154 | self::cReserveCommandId($client, $queueName, $id); |
155 | 155 | self::cUpdateCommandStatus($client, $queueName, $id, self::STATUS_SCHEDULED); |
156 | - $json = json_encode([$queueName, $id]); |
|
157 | - $client->zadd(':schedule', [$json => $dateTime->getTimestamp()]); |
|
156 | + $json = json_encode([ $queueName, $id ]); |
|
157 | + $client->zadd(':schedule', [ $json => $dateTime->getTimestamp() ]); |
|
158 | 158 | } |
159 | 159 | ); |
160 | 160 | } |
@@ -175,13 +175,13 @@ discard block |
||
175 | 175 | foreach ($queueNames as $queueName) { |
176 | 176 | $result = $this->client->zrangebyscore(':schedule', $lowScore, $highScore); |
177 | 177 | if (!empty($result)) { |
178 | - $this->client->pipeline(function (ClientContextInterface $client) use ($result, $queueName) { |
|
179 | - $idsToRelease = []; |
|
178 | + $this->client->pipeline(function(ClientContextInterface $client) use ($result, $queueName) { |
|
179 | + $idsToRelease = [ ]; |
|
180 | 180 | foreach ($result as $json => $score) { |
181 | 181 | list($thisQueueName, $id) = json_decode($json, true); |
182 | 182 | if ($thisQueueName === $queueName) { |
183 | 183 | $client->zrem(':schedule', $json); |
184 | - $idsToRelease[] = $id; |
|
184 | + $idsToRelease[ ] = $id; |
|
185 | 185 | } |
186 | 186 | } |
187 | 187 | self::cReleaseReservedCommandIds($client, $queueName, $idsToRelease); |
@@ -201,21 +201,21 @@ discard block |
||
201 | 201 | $start = $startTime->getTimestamp(); |
202 | 202 | } |
203 | 203 | $result = $this->client->zrangebyscore(':schedule', $start, $now->getTimestamp(), [ |
204 | - 'limit' => [0, $limit], |
|
204 | + 'limit' => [ 0, $limit ], |
|
205 | 205 | 'withscores' => true, |
206 | 206 | ]); |
207 | - $commands = []; |
|
208 | - if ($result !== []) { |
|
209 | - $queueNamesById = $idsByJson = []; |
|
207 | + $commands = [ ]; |
|
208 | + if ($result !== [ ]) { |
|
209 | + $queueNamesById = $idsByJson = [ ]; |
|
210 | 210 | $pipelineReturn = $this->client->pipeline( |
211 | - function (ClientContextInterface $client) use ($result, &$queueNamesById, &$idsByJson) { |
|
212 | - $idsByQueueName = []; |
|
211 | + function(ClientContextInterface $client) use ($result, &$queueNamesById, &$idsByJson) { |
|
212 | + $idsByQueueName = [ ]; |
|
213 | 213 | foreach ($result as $json => $score) { |
214 | 214 | list($queueName, $id) = json_decode($json, true); |
215 | 215 | self::cRetrieveCommand($client, $queueName, $id); |
216 | - $idsByQueueName[$queueName][] = $id; |
|
217 | - $queueNamesById[$id] = $queueName; |
|
218 | - $idsByJson[$json] = $id; |
|
216 | + $idsByQueueName[ $queueName ][ ] = $id; |
|
217 | + $queueNamesById[ $id ] = $queueName; |
|
218 | + $idsByJson[ $json ] = $id; |
|
219 | 219 | } |
220 | 220 | $client->zrem(':schedule', array_keys($result)); |
221 | 221 | foreach ($idsByQueueName as $queueName => $ids) { |
@@ -224,12 +224,12 @@ discard block |
||
224 | 224 | } |
225 | 225 | ); |
226 | 226 | foreach (array_keys($result) as $index => $json) { |
227 | - $id = $idsByJson[$json]; |
|
228 | - $commands[] = new ReceivedScheduledCommand( |
|
229 | - $queueNamesById[$id], |
|
227 | + $id = $idsByJson[ $json ]; |
|
228 | + $commands[ ] = new ReceivedScheduledCommand( |
|
229 | + $queueNamesById[ $id ], |
|
230 | 230 | $id, |
231 | - $pipelineReturn[$index], |
|
232 | - new \DateTime('@' . $result[$json]) |
|
231 | + $pipelineReturn[ $index ], |
|
232 | + new \DateTime('@'.$result[ $json ]) |
|
233 | 233 | ); |
234 | 234 | } |
235 | 235 | } |
@@ -239,7 +239,7 @@ discard block |
||
239 | 239 | private function storeCommandAndCheckReservationStatus(string $queueName, string $id, string $serialized): bool |
240 | 240 | { |
241 | 241 | list ($isReserved) = $this->client->pipeline( |
242 | - function (ClientContextInterface $client) use ($queueName, $id, $serialized) { |
|
242 | + function(ClientContextInterface $client) use ($queueName, $id, $serialized) { |
|
243 | 243 | self::cIsCommandIdReserved($client, $queueName, $id); |
244 | 244 | self::cStoreCommand($client, $queueName, $id, $serialized); |
245 | 245 | } |
@@ -260,14 +260,14 @@ discard block |
||
260 | 260 | |
261 | 261 | private static function cReserveCommandId($client, string $queueName, string $id) |
262 | 262 | { |
263 | - $client->sadd(":{$queueName}:queue_ids", [$id]); |
|
263 | + $client->sadd(":{$queueName}:queue_ids", [ $id ]); |
|
264 | 264 | } |
265 | 265 | |
266 | 266 | private static function cEndCommand($client, string $queueName, string $id, string $status) |
267 | 267 | { |
268 | 268 | self::cUpdateCommandStatus($client, $queueName, $id, $status); |
269 | - self::cReleaseReservedCommandIds($client, $queueName, [$id]); |
|
270 | - $client->srem(":{$queueName}:queue_ids", [$id]); |
|
269 | + self::cReleaseReservedCommandIds($client, $queueName, [ $id ]); |
|
270 | + $client->srem(":{$queueName}:queue_ids", [ $id ]); |
|
271 | 271 | $client->lrem(":{$queueName}:consuming", 1, $id); |
272 | 272 | } |
273 | 273 | |
@@ -288,7 +288,7 @@ discard block |
||
288 | 288 | |
289 | 289 | private static function cAddQueue($client, string $queueName) |
290 | 290 | { |
291 | - $client->sadd(':queues', [$queueName]); |
|
291 | + $client->sadd(':queues', [ $queueName ]); |
|
292 | 292 | } |
293 | 293 | |
294 | 294 | private static function cEmptyQueue($client, string $queueName) |