1
|
|
|
<?php |
2
|
|
|
|
3
|
|
|
namespace MGDigital\BusQue\Predis; |
4
|
|
|
|
5
|
|
|
use MGDigital\BusQue\Exception\CommandNotFoundException; |
6
|
|
|
use MGDigital\BusQue\Exception\TimeoutException; |
7
|
|
|
use MGDigital\BusQue\QueueAdapterInterface; |
8
|
|
|
use MGDigital\BusQue\ReceivedCommand; |
9
|
|
|
use MGDigital\BusQue\ReceivedScheduledCommand; |
10
|
|
|
use MGDigital\BusQue\SchedulerAdapterInterface; |
11
|
|
|
use MGDigital\BusQue\SchedulerWorker; |
12
|
|
|
use Predis\Client; |
13
|
|
|
use Predis\ClientContextInterface; |
14
|
|
|
use Predis\Connection\ConnectionException; |
15
|
|
|
|
16
|
|
|
class PredisAdapter implements QueueAdapterInterface, SchedulerAdapterInterface |
17
|
|
|
{ |
18
|
|
|
|
19
|
|
|
private $client; |
20
|
|
|
|
21
|
|
|
public function __construct(Client $client) |
22
|
|
|
{ |
23
|
|
|
$this->client = $client; |
24
|
|
|
} |
25
|
|
|
|
26
|
|
|
public function queueCommand(string $queueName, string $id, string $serialized) |
27
|
|
|
{ |
28
|
|
|
if (!$this->storeCommandAndCheckIfIdReserved($queueName, $id, $serialized)) { |
29
|
|
View Code Duplication |
$this->client->pipeline(function (ClientContextInterface $client) use ($queueName, $id) { |
|
|
|
|
30
|
|
|
self::cReserveCommandId($client, $queueName, $id); |
31
|
|
|
self::cUpdateCommandStatus($client, $queueName, $id, self::STATUS_QUEUED); |
32
|
|
|
$client->lpush(":{$queueName}:queue", [ $id ]); |
33
|
|
|
}); |
34
|
|
|
} |
35
|
|
|
} |
36
|
|
|
|
37
|
|
|
public function awaitCommand(string $queueName, int $timeout = null): ReceivedCommand |
38
|
|
|
{ |
39
|
|
|
$stopwatchStart = time(); |
40
|
|
|
$this->client->ping(); |
41
|
|
|
try { |
42
|
|
|
$id = $this->client->brpoplpush(":{$queueName}:queue", ":{$queueName}:consuming", $timeout ?? 0); |
43
|
|
|
} catch (ConnectionException $e) { |
44
|
|
|
$id = null; |
45
|
|
|
} |
46
|
|
|
if (!$id) { |
47
|
|
|
if ($timeout !== null) { |
48
|
|
|
$timeout = time() - $stopwatchStart - $timeout; |
49
|
|
|
if ($timeout <= 0) { |
50
|
|
|
throw new TimeoutException; |
51
|
|
|
} |
52
|
|
|
} |
53
|
|
|
return $this->awaitCommand($queueName, $timeout); |
54
|
|
|
} |
55
|
|
|
/* @var $id string */ |
56
|
|
View Code Duplication |
list(, $serialized) = $this->client->pipeline(function (ClientContextInterface $client) use ($queueName, $id) { |
|
|
|
|
57
|
|
|
self::cUpdateCommandStatus($client, $queueName, $id, self::STATUS_IN_PROGRESS); |
58
|
|
|
self::cRetrieveCommand($client, $queueName, $id); |
59
|
|
|
self::cReleaseReservedCommandIds($client, $queueName, [ $id ]); |
60
|
|
|
}); |
61
|
|
|
return new ReceivedCommand($queueName, $id, $serialized); |
62
|
|
|
} |
63
|
|
|
|
64
|
|
|
public function getCommandStatus(string $queueName, string $id): string |
65
|
|
|
{ |
66
|
|
|
return $this->client->hget(":{$queueName}:command_status", $id) ?? self::STATUS_NOT_FOUND; |
67
|
|
|
} |
68
|
|
|
|
69
|
|
|
public function setCommandCompleted(string $queueName, string $id) |
70
|
|
|
{ |
71
|
|
|
$this->client->pipeline(function (ClientContextInterface $client) use ($queueName, $id) { |
72
|
|
|
self::cEndCommand($client, $queueName, $id, self::STATUS_COMPLETED); |
73
|
|
|
}); |
74
|
|
|
} |
75
|
|
|
|
76
|
|
|
public function setCommandFailed(string $queueName, string $id) |
77
|
|
|
{ |
78
|
|
|
$this->client->pipeline(function (ClientContextInterface $client) use ($queueName, $id) { |
79
|
|
|
self::cEndCommand($client, $queueName, $id, self::STATUS_FAILED); |
80
|
|
|
}); |
81
|
|
|
} |
82
|
|
|
|
83
|
|
|
public function putQueue(string $queueName) |
84
|
|
|
{ |
85
|
|
|
$this->client->pipeline(function (ClientContextInterface $client) use ($queueName) { |
86
|
|
|
self::cAddQueue($client, $queueName); |
87
|
|
|
}); |
88
|
|
|
} |
89
|
|
|
|
90
|
|
|
public function getQueueNames(): array |
91
|
|
|
{ |
92
|
|
|
return $this->client->smembers(':queues'); |
93
|
|
|
} |
94
|
|
|
|
95
|
|
|
public function getQueuedCount(string $queueName): int |
96
|
|
|
{ |
97
|
|
|
return $this->client->llen(":{$queueName}:queue"); |
98
|
|
|
} |
99
|
|
|
|
100
|
|
|
public function getQueuedIds(string $queueName, int $offset = 0, int $limit = 10): array |
101
|
|
|
{ |
102
|
|
|
return $this->client->lrange(":{$queueName}:queue", $offset, $limit); |
103
|
|
|
} |
104
|
|
|
|
105
|
|
|
public function getConsumingCount(string $queueName): int |
106
|
|
|
{ |
107
|
|
|
return $this->client->llen(":{$queueName}:consuming"); |
108
|
|
|
} |
109
|
|
|
|
110
|
|
|
public function getConsumingIds(string $queueName, int $offset = 0, int $limit = 10): array |
111
|
|
|
{ |
112
|
|
|
return $this->client->lrange(":{$queueName}:consuming", $offset, $limit); |
113
|
|
|
} |
114
|
|
|
|
115
|
|
|
public function readCommand(string $queueName, string $id): string |
116
|
|
|
{ |
117
|
|
|
$serialized = self::cRetrieveCommand($this->client, $queueName, $id); |
118
|
|
|
if ($serialized === null) { |
119
|
|
|
throw new CommandNotFoundException(); |
120
|
|
|
} |
121
|
|
|
return $serialized; |
122
|
|
|
} |
123
|
|
|
|
124
|
|
|
public function clearQueue(string $queueName) |
125
|
|
|
{ |
126
|
|
|
self::cEmptyQueue($this->client, $queueName); |
127
|
|
|
} |
128
|
|
|
|
129
|
|
|
public function deleteQueue(string $queueName) |
130
|
|
|
{ |
131
|
|
|
$this->client->pipeline(function (ClientContextInterface $client) use ($queueName) { |
132
|
|
|
self::cDeleteQueue($client, $queueName); |
133
|
|
|
}); |
134
|
|
|
$this->clearSchedule([$queueName]); |
135
|
|
|
} |
136
|
|
|
|
137
|
|
|
public function purgeCommand(string $queueName, string $id) |
138
|
|
|
{ |
139
|
|
|
$this->client->pipeline(function (ClientContextInterface $client) use ($queueName, $id) { |
140
|
|
|
$client->hdel(":{$queueName}:command_store", [ $id ]); |
141
|
|
|
$client->hdel(":{$queueName}:command_status", [ $id ]); |
142
|
|
|
self::cReleaseReservedCommandIds($client, $queueName, [ $id ]); |
143
|
|
|
$json = json_encode([ $queueName, $id ]); |
144
|
|
|
$client->lrem(":{$queueName}:queue", 1, $id); |
145
|
|
|
$client->lrem(":{$queueName}:consuming", 1, $id); |
146
|
|
|
$client->zrem(':schedule', $json); |
147
|
|
|
}); |
148
|
|
|
} |
149
|
|
|
|
150
|
|
|
public function scheduleCommand(string $queueName, string $id, string $serialized, \DateTime $dateTime) |
151
|
|
|
{ |
152
|
|
|
if (!$this->storeCommandAndCheckIfIdReserved($queueName, $id, $serialized)) { |
153
|
|
|
$this->client->pipeline( |
154
|
|
|
function (ClientContextInterface $client) use ($queueName, $id, $dateTime) { |
155
|
|
|
self::cReserveCommandId($client, $queueName, $id); |
156
|
|
|
self::cUpdateCommandStatus($client, $queueName, $id, self::STATUS_SCHEDULED); |
157
|
|
|
$json = json_encode([ $queueName, $id ]); |
158
|
|
|
$client->zadd(':schedule', [ $json => $dateTime->getTimestamp() ]); |
159
|
|
|
} |
160
|
|
|
); |
161
|
|
|
} |
162
|
|
|
} |
163
|
|
|
|
164
|
|
|
public function cancelScheduledCommand(string $queueName, string $id) |
165
|
|
|
{ |
166
|
|
|
$this->purgeCommand($queueName, $id); |
167
|
|
|
} |
168
|
|
|
|
169
|
|
|
public function clearSchedule(array $queueNames = null, \DateTime $start = null, \DateTime $end = null) |
170
|
|
|
{ |
171
|
|
|
$lowScore = $start ? $start->getTimestamp() : '-inf'; |
172
|
|
|
$highScore = $end ? $end->getTimestamp() : '+inf'; |
173
|
|
|
if ($queueNames === null) { |
174
|
|
|
$queueNames = $this->getQueueNames(); |
175
|
|
|
} |
176
|
|
|
foreach ($queueNames as $queueName) { |
177
|
|
|
$result = $this->client->zrangebyscore(':schedule', $lowScore, $highScore); |
178
|
|
|
if (!empty($result)) { |
179
|
|
|
$this->client->pipeline(function (ClientContextInterface $client) use ($result, $queueName) { |
180
|
|
|
$idsToRelease = [ ]; |
181
|
|
|
foreach ($result as $json) { |
182
|
|
|
list($thisQueueName, $id) = json_decode($json, true); |
183
|
|
|
if ($thisQueueName === $queueName) { |
184
|
|
|
$client->zrem(':schedule', [ $json ]); |
185
|
|
|
$idsToRelease[ ] = $id; |
186
|
|
|
} |
187
|
|
|
} |
188
|
|
|
if ($idsToRelease !== [ ]) { |
189
|
|
|
self::cReleaseReservedCommandIds($client, $queueName, $idsToRelease); |
190
|
|
|
} |
191
|
|
|
}); |
192
|
|
|
} |
193
|
|
|
} |
194
|
|
|
} |
195
|
|
|
|
196
|
|
|
public function receiveDueCommands( |
197
|
|
|
\DateTime $now, |
198
|
|
|
int $limit = SchedulerWorker::DEFAULT_THROTTLE, |
199
|
|
|
\DateTime $startTime = null |
200
|
|
|
): array { |
201
|
|
|
if ($startTime === null) { |
202
|
|
|
$start = 0; |
203
|
|
|
} else { |
204
|
|
|
$start = $startTime->getTimestamp(); |
205
|
|
|
} |
206
|
|
|
$result = $this->client->zrangebyscore(':schedule', $start, $now->getTimestamp(), [ |
207
|
|
|
'limit' => [ 0, $limit ], |
208
|
|
|
'withscores' => true, |
209
|
|
|
]); |
210
|
|
|
$commands = [ ]; |
211
|
|
|
if ($result !== [ ]) { |
212
|
|
|
$queueNamesById = $idsByJson = [ ]; |
213
|
|
|
$pipelineReturn = $this->client->pipeline( |
214
|
|
|
function (ClientContextInterface $client) use ($result, &$queueNamesById, &$idsByJson) { |
215
|
|
|
$idsByQueueName = [ ]; |
216
|
|
|
foreach ($result as $json => $score) { |
217
|
|
|
list($queueName, $id) = json_decode($json, true); |
218
|
|
|
self::cRetrieveCommand($client, $queueName, $id); |
219
|
|
|
$idsByQueueName[ $queueName ][ ] = $id; |
220
|
|
|
$queueNamesById[ $id ] = $queueName; |
221
|
|
|
$idsByJson[ $json ] = $id; |
222
|
|
|
} |
223
|
|
|
$client->zrem(':schedule', array_keys($result)); |
224
|
|
|
foreach ($idsByQueueName as $queueName => $ids) { |
225
|
|
|
self::cReleaseReservedCommandIds($client, $queueName, $ids); |
226
|
|
|
} |
227
|
|
|
} |
228
|
|
|
); |
229
|
|
|
foreach (array_keys($result) as $index => $json) { |
230
|
|
|
$id = $idsByJson[ $json ]; |
231
|
|
|
$commands[ ] = new ReceivedScheduledCommand( |
232
|
|
|
$queueNamesById[ $id ], |
233
|
|
|
$id, |
234
|
|
|
$pipelineReturn[ $index ], |
235
|
|
|
new \DateTime('@'.$result[ $json ]) |
236
|
|
|
); |
237
|
|
|
} |
238
|
|
|
} |
239
|
|
|
return $commands; |
240
|
|
|
} |
241
|
|
|
|
242
|
|
|
private function storeCommandAndCheckIfIdReserved(string $queueName, string $id, string $serialized): bool |
243
|
|
|
{ |
244
|
|
|
list ($isReserved) = $this->client->pipeline( |
245
|
|
|
function (ClientContextInterface $client) use ($queueName, $id, $serialized) { |
246
|
|
|
$client->sismember(":{$queueName}:queue_ids", $id); |
247
|
|
|
$client->hset(":{$queueName}:command_store", $id, $serialized); |
248
|
|
|
self::cAddQueue($client, $queueName); |
249
|
|
|
} |
250
|
|
|
); |
251
|
|
|
return $isReserved; |
252
|
|
|
} |
253
|
|
|
|
254
|
|
|
private static function cRetrieveCommand($client, string $queueName, string $id) |
255
|
|
|
{ |
256
|
|
|
return $client->hget(":{$queueName}:command_store", $id); |
257
|
|
|
} |
258
|
|
|
|
259
|
|
|
private static function cReserveCommandId($client, string $queueName, string $id) |
260
|
|
|
{ |
261
|
|
|
$client->sadd(":{$queueName}:queue_ids", [ $id ]); |
262
|
|
|
} |
263
|
|
|
|
264
|
|
|
private static function cEndCommand($client, string $queueName, string $id, string $status) |
265
|
|
|
{ |
266
|
|
|
self::cUpdateCommandStatus($client, $queueName, $id, $status); |
267
|
|
|
self::cReleaseReservedCommandIds($client, $queueName, [ $id ]); |
268
|
|
|
$client->srem(":{$queueName}:queue_ids", [ $id ]); |
269
|
|
|
$client->lrem(":{$queueName}:consuming", 1, $id); |
270
|
|
|
} |
271
|
|
|
|
272
|
|
|
private static function cReleaseReservedCommandIds($client, string $queueName, array $ids) |
273
|
|
|
{ |
274
|
|
|
$client->srem(":{$queueName}:queue_ids", $ids); |
275
|
|
|
} |
276
|
|
|
|
277
|
|
|
private static function cUpdateCommandStatus($client, string $queueName, string $id, string $status) |
278
|
|
|
{ |
279
|
|
|
$client->hset(":{$queueName}:command_status", $id, $status); |
280
|
|
|
} |
281
|
|
|
|
282
|
|
|
private static function cAddQueue($client, string $queueName) |
283
|
|
|
{ |
284
|
|
|
$client->sadd(':queues', [ $queueName ]); |
285
|
|
|
} |
286
|
|
|
|
287
|
|
|
private static function cEmptyQueue($client, string $queueName) |
288
|
|
|
{ |
289
|
|
|
$client->del([ |
290
|
|
|
":{$queueName}:queue", |
291
|
|
|
":{$queueName}:consuming", |
292
|
|
|
":{$queueName}:command_status", |
293
|
|
|
":{$queueName}:queue_ids" |
294
|
|
|
]); |
295
|
|
|
} |
296
|
|
|
|
297
|
|
|
private static function cDeleteQueue($client, string $queueName) |
298
|
|
|
{ |
299
|
|
|
self::cEmptyQueue($client, $queueName); |
300
|
|
|
$client->srem(':queues', [ $queueName ]); |
301
|
|
|
} |
302
|
|
|
} |
303
|
|
|
|
Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.
You can also find more detailed suggestions in the “Code” section of your repository.