Completed
Push — master ( 3a624b...055079 )
by Mike
03:15
created
src/SchedulerWorker.php 1 patch
Spacing   +3 added lines, -3 removed lines patch added patch discarded remove patch
@@ -9,7 +9,7 @@  discard block
 block discarded – undo
9 9
 
10 10
     private $implementation;
11 11
 
12
-    public function __construct(Implementation $implementation)
12
+    public function __construct (Implementation $implementation)
13 13
     {
14 14
         $this->implementation = $implementation;
15 15
     }
@@ -21,7 +21,7 @@  discard block
 block discarded – undo
21 21
      * @param int $uSleepTime The number of microseconds to usleep between each query to the scheduler.
22 22
      * @param \DateInterval|null $expiry The expiry interval for an overdue unqueued command.
23 23
      */
24
-    public function work(
24
+    public function work (
25 25
         int $limit = null,
26 26
         int $throttle = self::DEFAULT_THROTTLE,
27 27
         int $time = null,
@@ -44,7 +44,7 @@  discard block
 block discarded – undo
44 44
         }
45 45
     }
46 46
 
47
-    private function iterate(int $throttle, \DateInterval $expiry = null): int
47
+    private function iterate (int $throttle, \DateInterval $expiry = null): int
48 48
     {
49 49
         $count = 0;
50 50
         $now = $this->implementation->getClock()->getTime();
Please login to merge, or discard this patch.
src/Logging/LoggingErrorHandler.php 1 patch
Spacing   +3 added lines, -3 removed lines patch added patch discarded remove patch
@@ -10,12 +10,12 @@  discard block
 block discarded – undo
10 10
 
11 11
     private $logger;
12 12
 
13
-    public function __construct(LoggerInterface $logger)
13
+    public function __construct (LoggerInterface $logger)
14 14
     {
15 15
         $this->logger = $logger;
16 16
     }
17 17
 
18
-    public function handleCommandError($command, \Throwable $error)
18
+    public function handleCommandError ($command, \Throwable $error)
19 19
     {
20 20
         $this->logger->error($error->getMessage(), [
21 21
             'command' => $command,
@@ -23,7 +23,7 @@  discard block
 block discarded – undo
23 23
         ]);
24 24
     }
25 25
 
26
-    public function handleUnserializationError(
26
+    public function handleUnserializationError (
27 27
         string $queueName,
28 28
         string $commandId,
29 29
         string $serialized,
Please login to merge, or discard this patch.
src/BusQue.php 1 patch
Spacing   +18 added lines, -18 removed lines patch added patch discarded remove patch
@@ -7,93 +7,93 @@
 block discarded – undo
7 7
 
8 8
     private $implementation;
9 9
 
10
-    public function __construct(Implementation $implementation)
10
+    public function __construct (Implementation $implementation)
11 11
     {
12 12
         $this->implementation = $implementation;
13 13
     }
14 14
 
15
-    public function getQueueName($command): string
15
+    public function getQueueName ($command): string
16 16
     {
17 17
         return $this->implementation->getQueueResolver()->resolveQueueName($command);
18 18
     }
19 19
 
20
-    public function serializeCommand($command): string
20
+    public function serializeCommand ($command): string
21 21
     {
22 22
         return $this->implementation->getCommandSerializer()->serialize($command);
23 23
     }
24 24
 
25
-    public function unserializeCommand(string $serialized)
25
+    public function unserializeCommand (string $serialized)
26 26
     {
27 27
         return $this->implementation->getCommandSerializer()->unserialize($serialized);
28 28
     }
29 29
 
30
-    public function generateCommandId($command): string
30
+    public function generateCommandId ($command): string
31 31
     {
32 32
         return $this->implementation->getCommandIdGenerator()->generateId($command);
33 33
     }
34 34
 
35
-    public function queueCommand($command, string $commandId = null)
35
+    public function queueCommand ($command, string $commandId = null)
36 36
     {
37 37
         $this->implementation->getCommandBusAdapter()->handle(new QueuedCommand($command, $commandId));
38 38
     }
39 39
 
40
-    public function scheduleCommand($command, \DateTime $dateTime, string $commandId = null)
40
+    public function scheduleCommand ($command, \DateTime $dateTime, string $commandId = null)
41 41
     {
42 42
         $this->implementation->getCommandBusAdapter()->handle(new ScheduledCommand($command, $dateTime, $commandId));
43 43
     }
44 44
 
45
-    public function getCommandStatus(string $queueName, string $commandId): string
45
+    public function getCommandStatus (string $queueName, string $commandId): string
46 46
     {
47 47
         return $this->implementation->getQueueAdapter()->getCommandStatus($queueName, $commandId);
48 48
     }
49 49
 
50
-    public function getQueuedCount(string $queueName): int
50
+    public function getQueuedCount (string $queueName): int
51 51
     {
52 52
         return $this->implementation->getQueueAdapter()->getQueuedCount($queueName);
53 53
     }
54 54
 
55
-    public function purgeCommand(string $queueName, string $commandId)
55
+    public function purgeCommand (string $queueName, string $commandId)
56 56
     {
57 57
         $this->implementation->getQueueAdapter()->purgeCommand($queueName, $commandId);
58 58
     }
59 59
 
60
-    public function emptyQueue(string $queueName)
60
+    public function emptyQueue (string $queueName)
61 61
     {
62 62
         $this->implementation->getQueueAdapter()->emptyQueue($queueName);
63 63
     }
64 64
 
65
-    public function listQueues(): array
65
+    public function listQueues (): array
66 66
     {
67 67
         return $this->implementation->getQueueAdapter()->getQueueNames();
68 68
     }
69 69
 
70
-    public function listQueuedIds(string $queueName, int $offset = 0, int $limit = 10): array
70
+    public function listQueuedIds (string $queueName, int $offset = 0, int $limit = 10): array
71 71
     {
72 72
         return $this->implementation->getQueueAdapter()->getQueuedIds($queueName, $offset, $limit);
73 73
     }
74 74
 
75
-    public function getInProgressCount(string $queueName): int
75
+    public function getInProgressCount (string $queueName): int
76 76
     {
77 77
         return $this->implementation->getQueueAdapter()->getConsumingCount($queueName);
78 78
     }
79 79
 
80
-    public function listInProgressIds(string $queueName, int $offset = 0, int $limit = 10): array
80
+    public function listInProgressIds (string $queueName, int $offset = 0, int $limit = 10): array
81 81
     {
82 82
         return $this->implementation->getQueueAdapter()->getConsumingIds($queueName, $offset, $limit);
83 83
     }
84 84
 
85
-    public function getCommand(string $queueName, string $id)
85
+    public function getCommand (string $queueName, string $id)
86 86
     {
87 87
         $serialized = $this->implementation->getQueueAdapter()->readCommand($queueName, $id);
88 88
         return $this->unserializeCommand($serialized);
89 89
     }
90 90
 
91
-    public function workQueue(string $queueName, int $n = null, int $time = null)
91
+    public function workQueue (string $queueName, int $n = null, int $time = null)
92 92
     {
93 93
         (new QueueWorker($this->implementation))->work($queueName, $n, $time);
94 94
     }
95 95
 
96
-    public function workSchedule(int $n = null, int $time = null)
96
+    public function workSchedule (int $n = null, int $time = null)
97 97
     {
98 98
         (new SchedulerWorker($this->implementation))->work($n, $time);
99 99
     }
Please login to merge, or discard this patch.
src/ClockInterface.php 1 patch
Spacing   +1 added lines, -1 removed lines patch added patch discarded remove patch
@@ -5,5 +5,5 @@
 block discarded – undo
5 5
 interface ClockInterface
6 6
 {
7 7
 
8
-    public function getTime(): \DateTime;
8
+    public function getTime (): \DateTime;
9 9
 }
Please login to merge, or discard this patch.
src/QueueWorker.php 1 patch
Spacing   +3 added lines, -3 removed lines patch added patch discarded remove patch
@@ -9,12 +9,12 @@  discard block
 block discarded – undo
9 9
 
10 10
     private $implementation;
11 11
 
12
-    public function __construct(Implementation $implementation)
12
+    public function __construct (Implementation $implementation)
13 13
     {
14 14
         $this->implementation = $implementation;
15 15
     }
16 16
 
17
-    public function work(string $queueName, int $n = null, int $time = null)
17
+    public function work (string $queueName, int $n = null, int $time = null)
18 18
     {
19 19
         $stopwatchStart = time();
20 20
         while ($n === null || $n > 0) {
@@ -26,7 +26,7 @@  discard block
 block discarded – undo
26 26
         }
27 27
     }
28 28
 
29
-    private function iterate(string $queueName, int $time = null)
29
+    private function iterate (string $queueName, int $time = null)
30 30
     {
31 31
         try {
32 32
             $received = $this->implementation->getQueueAdapter()
Please login to merge, or discard this patch.
src/CommandSerializerInterface.php 1 patch
Spacing   +2 added lines, -2 removed lines patch added patch discarded remove patch
@@ -5,11 +5,11 @@
 block discarded – undo
5 5
 interface CommandSerializerInterface
6 6
 {
7 7
 
8
-    public function serialize($command): string;
8
+    public function serialize ($command): string;
9 9
 
10 10
     /**
11 11
      * @param string $serialized
12 12
      * @return mixed A command object which can be handled by the commandbus.
13 13
      */
14
-    public function unserialize(string $serialized);
14
+    public function unserialize (string $serialized);
15 15
 }
Please login to merge, or discard this patch.
src/Predis/PredisAdapter.php 1 patch
Spacing   +40 added lines, -40 removed lines patch added patch discarded remove patch
@@ -18,15 +18,15 @@  discard block
 block discarded – undo
18 18
 
19 19
     private $client;
20 20
 
21
-    public function __construct(Client $client)
21
+    public function __construct (Client $client)
22 22
     {
23 23
         $this->client = $client;
24 24
     }
25 25
 
26
-    public function queueCommand(string $queueName, string $id, string $serialized)
26
+    public function queueCommand (string $queueName, string $id, string $serialized)
27 27
     {
28 28
         if (!$this->storeCommandAndCheckIfIdReserved($queueName, $id, $serialized)) {
29
-            $this->client->pipeline(function (ClientContextInterface $client) use ($queueName, $id) {
29
+            $this->client->pipeline(function(ClientContextInterface $client) use ($queueName, $id) {
30 30
                 self::cReserveCommandId($client, $queueName, $id);
31 31
                 self::cUpdateCommandStatus($client, $queueName, $id, self::STATUS_QUEUED);
32 32
                 $client->lpush(":{$queueName}:queue", [ $id ]);
@@ -34,7 +34,7 @@  discard block
 block discarded – undo
34 34
         }
35 35
     }
36 36
 
37
-    public function awaitCommand(string $queueName, int $timeout = null): ReceivedCommand
37
+    public function awaitCommand (string $queueName, int $timeout = null): ReceivedCommand
38 38
     {
39 39
         $stopwatchStart = time();
40 40
         $this->client->ping();
@@ -53,7 +53,7 @@  discard block
 block discarded – undo
53 53
             return $this->awaitCommand($queueName, $timeout);
54 54
         }
55 55
         /* @var $id string */
56
-        list(, $serialized) = $this->client->pipeline(function (ClientContextInterface $client) use ($queueName, $id) {
56
+        list(, $serialized) = $this->client->pipeline(function(ClientContextInterface $client) use ($queueName, $id) {
57 57
             self::cUpdateCommandStatus($client, $queueName, $id, self::STATUS_IN_PROGRESS);
58 58
             self::cRetrieveCommand($client, $queueName, $id);
59 59
             self::cReleaseReservedCommandIds($client, $queueName, [ $id ]);
@@ -61,58 +61,58 @@  discard block
 block discarded – undo
61 61
         return new ReceivedCommand($queueName, $id, $serialized);
62 62
     }
63 63
 
64
-    public function getCommandStatus(string $queueName, string $id): string
64
+    public function getCommandStatus (string $queueName, string $id): string
65 65
     {
66 66
         return $this->client->hget(":{$queueName}:command_status", $id) ?? self::STATUS_NOT_FOUND;
67 67
     }
68 68
 
69
-    public function setCommandCompleted(string $queueName, string $id)
69
+    public function setCommandCompleted (string $queueName, string $id)
70 70
     {
71
-        $this->client->pipeline(function (ClientContextInterface $client) use ($queueName, $id) {
71
+        $this->client->pipeline(function(ClientContextInterface $client) use ($queueName, $id) {
72 72
             self::cEndCommand($client, $queueName, $id, self::STATUS_COMPLETED);
73 73
         });
74 74
     }
75 75
 
76
-    public function setCommandFailed(string $queueName, string $id)
76
+    public function setCommandFailed (string $queueName, string $id)
77 77
     {
78
-        $this->client->pipeline(function (ClientContextInterface $client) use ($queueName, $id) {
78
+        $this->client->pipeline(function(ClientContextInterface $client) use ($queueName, $id) {
79 79
             self::cEndCommand($client, $queueName, $id, self::STATUS_FAILED);
80 80
         });
81 81
     }
82 82
 
83
-    public function putQueue(string $queueName)
83
+    public function putQueue (string $queueName)
84 84
     {
85
-        $this->client->pipeline(function (ClientContextInterface $client) use ($queueName) {
85
+        $this->client->pipeline(function(ClientContextInterface $client) use ($queueName) {
86 86
             self::cAddQueue($client, $queueName);
87 87
         });
88 88
     }
89 89
 
90
-    public function getQueueNames(): array
90
+    public function getQueueNames (): array
91 91
     {
92 92
         return $this->client->smembers(':queues');
93 93
     }
94 94
 
95
-    public function getQueuedCount(string $queueName): int
95
+    public function getQueuedCount (string $queueName): int
96 96
     {
97 97
         return $this->client->llen(":{$queueName}:queue");
98 98
     }
99 99
 
100
-    public function getQueuedIds(string $queueName, int $offset = 0, int $limit = 10): array
100
+    public function getQueuedIds (string $queueName, int $offset = 0, int $limit = 10): array
101 101
     {
102 102
         return $this->client->lrange(":{$queueName}:queue", $offset, $limit);
103 103
     }
104 104
 
105
-    public function getConsumingCount(string $queueName): int
105
+    public function getConsumingCount (string $queueName): int
106 106
     {
107 107
         return $this->client->llen(":{$queueName}:consuming");
108 108
     }
109 109
 
110
-    public function getConsumingIds(string $queueName, int $offset = 0, int $limit = 10): array
110
+    public function getConsumingIds (string $queueName, int $offset = 0, int $limit = 10): array
111 111
     {
112 112
         return $this->client->lrange(":{$queueName}:consuming", $offset, $limit);
113 113
     }
114 114
 
115
-    public function readCommand(string $queueName, string $id): string
115
+    public function readCommand (string $queueName, string $id): string
116 116
     {
117 117
         $serialized = self::cRetrieveCommand($this->client, $queueName, $id);
118 118
         if ($serialized === null) {
@@ -121,21 +121,21 @@  discard block
 block discarded – undo
121 121
         return $serialized;
122 122
     }
123 123
 
124
-    public function emptyQueue(string $queueName)
124
+    public function emptyQueue (string $queueName)
125 125
     {
126 126
         self::cEmptyQueue($this->client, $queueName);
127 127
     }
128 128
 
129
-    public function deleteQueue(string $queueName)
129
+    public function deleteQueue (string $queueName)
130 130
     {
131
-        $this->client->pipeline(function (ClientContextInterface $client) use ($queueName) {
131
+        $this->client->pipeline(function(ClientContextInterface $client) use ($queueName) {
132 132
             self::cDeleteQueue($client, $queueName);
133 133
         });
134 134
     }
135 135
 
136
-    public function purgeCommand(string $queueName, string $id)
136
+    public function purgeCommand (string $queueName, string $id)
137 137
     {
138
-        $this->client->pipeline(function (ClientContextInterface $client) use ($queueName, $id) {
138
+        $this->client->pipeline(function(ClientContextInterface $client) use ($queueName, $id) {
139 139
             $client->hdel(":{$queueName}:command_store", [ $id ]);
140 140
             $client->hdel(":{$queueName}:command_status", [ $id ]);
141 141
             self::cReleaseReservedCommandIds($client, $queueName, [ $id ]);
@@ -146,11 +146,11 @@  discard block
 block discarded – undo
146 146
         });
147 147
     }
148 148
 
149
-    public function scheduleCommand(string $queueName, string $id, string $serialized, \DateTime $dateTime)
149
+    public function scheduleCommand (string $queueName, string $id, string $serialized, \DateTime $dateTime)
150 150
     {
151 151
         if (!$this->storeCommandAndCheckIfIdReserved($queueName, $id, $serialized)) {
152 152
             $this->client->pipeline(
153
-                function (ClientContextInterface $client) use ($queueName, $id, $dateTime) {
153
+                function(ClientContextInterface $client) use ($queueName, $id, $dateTime) {
154 154
                     self::cReserveCommandId($client, $queueName, $id);
155 155
                     self::cUpdateCommandStatus($client, $queueName, $id, self::STATUS_SCHEDULED);
156 156
                     $json = json_encode([ $queueName, $id ]);
@@ -160,12 +160,12 @@  discard block
 block discarded – undo
160 160
         }
161 161
     }
162 162
 
163
-    public function cancelScheduledCommand(string $queueName, string $id)
163
+    public function cancelScheduledCommand (string $queueName, string $id)
164 164
     {
165 165
         $this->purgeCommand($queueName, $id);
166 166
     }
167 167
 
168
-    public function clearSchedule(array $queueNames = null, \DateTime $start = null, \DateTime $end = null)
168
+    public function clearSchedule (array $queueNames = null, \DateTime $start = null, \DateTime $end = null)
169 169
     {
170 170
         $lowScore = $start ? $start->getTimestamp() : 0;
171 171
         $highScore = $end ? $end->getTimestamp() : -1;
@@ -175,7 +175,7 @@  discard block
 block discarded – undo
175 175
         foreach ($queueNames as $queueName) {
176 176
             $result = $this->client->zrangebyscore(':schedule', $lowScore, $highScore);
177 177
             if (!empty($result)) {
178
-                $this->client->pipeline(function (ClientContextInterface $client) use ($result, $queueName) {
178
+                $this->client->pipeline(function(ClientContextInterface $client) use ($result, $queueName) {
179 179
                     $idsToRelease = [ ];
180 180
                     foreach ($result as $json => $score) {
181 181
                         list($thisQueueName, $id) = json_decode($json, true);
@@ -190,7 +190,7 @@  discard block
 block discarded – undo
190 190
         }
191 191
     }
192 192
 
193
-    public function receiveDueCommands(
193
+    public function receiveDueCommands (
194 194
         \DateTime $now,
195 195
         int $limit = SchedulerWorker::DEFAULT_THROTTLE,
196 196
         \DateTime $startTime = null
@@ -208,7 +208,7 @@  discard block
 block discarded – undo
208 208
         if ($result !== [ ]) {
209 209
             $queueNamesById = $idsByJson = [ ];
210 210
             $pipelineReturn = $this->client->pipeline(
211
-                function (ClientContextInterface $client) use ($result, &$queueNamesById, &$idsByJson) {
211
+                function(ClientContextInterface $client) use ($result, &$queueNamesById, &$idsByJson) {
212 212
                     $idsByQueueName = [ ];
213 213
                     foreach ($result as $json => $score) {
214 214
                         list($queueName, $id) = json_decode($json, true);
@@ -236,10 +236,10 @@  discard block
 block discarded – undo
236 236
         return $commands;
237 237
     }
238 238
 
239
-    private function storeCommandAndCheckIfIdReserved(string $queueName, string $id, string $serialized): bool
239
+    private function storeCommandAndCheckIfIdReserved (string $queueName, string $id, string $serialized): bool
240 240
     {
241 241
         list ($isReserved) = $this->client->pipeline(
242
-            function (ClientContextInterface $client) use ($queueName, $id, $serialized) {
242
+            function(ClientContextInterface $client) use ($queueName, $id, $serialized) {
243 243
                 $client->sismember(":{$queueName}:queue_ids", $id);
244 244
                 $client->hset(":{$queueName}:command_store", $id, $serialized);
245 245
                 self::cAddQueue($client, $queueName);
@@ -248,17 +248,17 @@  discard block
 block discarded – undo
248 248
         return $isReserved;
249 249
     }
250 250
 
251
-    private static function cRetrieveCommand($client, string $queueName, string $id)
251
+    private static function cRetrieveCommand ($client, string $queueName, string $id)
252 252
     {
253 253
         return $client->hget(":{$queueName}:command_store", $id);
254 254
     }
255 255
 
256
-    private static function cReserveCommandId($client, string $queueName, string $id)
256
+    private static function cReserveCommandId ($client, string $queueName, string $id)
257 257
     {
258 258
         $client->sadd(":{$queueName}:queue_ids", [ $id ]);
259 259
     }
260 260
 
261
-    private static function cEndCommand($client, string $queueName, string $id, string $status)
261
+    private static function cEndCommand ($client, string $queueName, string $id, string $status)
262 262
     {
263 263
         self::cUpdateCommandStatus($client, $queueName, $id, $status);
264 264
         self::cReleaseReservedCommandIds($client, $queueName, [ $id ]);
@@ -266,22 +266,22 @@  discard block
 block discarded – undo
266 266
         $client->lrem(":{$queueName}:consuming", 1, $id);
267 267
     }
268 268
 
269
-    private static function cReleaseReservedCommandIds($client, string $queueName, array $ids)
269
+    private static function cReleaseReservedCommandIds ($client, string $queueName, array $ids)
270 270
     {
271 271
         $client->srem(":{$queueName}:queue_ids", $ids);
272 272
     }
273 273
 
274
-    private static function cUpdateCommandStatus($client, string $queueName, string $id, string $status)
274
+    private static function cUpdateCommandStatus ($client, string $queueName, string $id, string $status)
275 275
     {
276 276
         $client->hset(":{$queueName}:command_status", $id, $status);
277 277
     }
278 278
 
279
-    private static function cAddQueue($client, string $queueName)
279
+    private static function cAddQueue ($client, string $queueName)
280 280
     {
281 281
         $client->sadd(':queues', [ $queueName ]);
282 282
     }
283 283
 
284
-    private static function cEmptyQueue($client, string $queueName)
284
+    private static function cEmptyQueue ($client, string $queueName)
285 285
     {
286 286
         $client->del([
287 287
             ":{$queueName}:queue",
@@ -292,7 +292,7 @@  discard block
 block discarded – undo
292 292
         ]);
293 293
     }
294 294
 
295
-    private static function cDeleteQueue($client, string $queueName)
295
+    private static function cDeleteQueue ($client, string $queueName)
296 296
     {
297 297
         self::cEmptyQueue($client, $queueName);
298 298
         $client->srem(':queues', $queueName);
Please login to merge, or discard this patch.
src/QueueResolver/SimpleQueueResolver.php 1 patch
Spacing   +2 added lines, -2 removed lines patch added patch discarded remove patch
@@ -9,12 +9,12 @@
 block discarded – undo
9 9
 
10 10
     private $queueName;
11 11
 
12
-    public function __construct(string $queueName)
12
+    public function __construct (string $queueName)
13 13
     {
14 14
         $this->queueName = $queueName;
15 15
     }
16 16
 
17
-    public function resolveQueueName($command): string
17
+    public function resolveQueueName ($command): string
18 18
     {
19 19
         return $this->queueName;
20 20
     }
Please login to merge, or discard this patch.
src/QueueResolver/QueueVoterInterface.php 1 patch
Spacing   +1 added lines, -1 removed lines patch added patch discarded remove patch
@@ -9,5 +9,5 @@
 block discarded – undo
9 9
      * @param mixed $command
10 10
      * @return QueueVote|null
11 11
      */
12
-    public function getVote($command);
12
+    public function getVote ($command);
13 13
 }
Please login to merge, or discard this patch.