Passed
Branch master (5eac71)
by Mike
03:51 queued 45s
created

storeCommandAndCheckReservationStatus()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 10
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 10
rs 9.4285
c 0
b 0
f 0
cc 1
eloc 6
nc 1
nop 3
1
<?php
2
3
namespace MGDigital\BusQue\Predis;
4
5
use MGDigital\BusQue\Exception\CommandNotFoundException;
6
use MGDigital\BusQue\Exception\TimeoutException;
7
use MGDigital\BusQue\QueueAdapterInterface;
8
use MGDigital\BusQue\ReceivedCommand;
9
use MGDigital\BusQue\ReceivedScheduledCommand;
10
use MGDigital\BusQue\SchedulerAdapterInterface;
11
use MGDigital\BusQue\SchedulerWorker;
12
use Predis\Client;
13
use Predis\ClientContextInterface;
14
use Predis\Connection\ConnectionException;
15
16
class PredisAdapter implements QueueAdapterInterface, SchedulerAdapterInterface
17
{
18
19
    private $client;
20
21
    public function __construct(Client $client)
22
    {
23
        $this->client = $client;
24
    }
25
26
    public function queueCommand(string $queueName, string $id, string $serialized)
27
    {
28
        if (!$this->storeCommandAndCheckReservationStatus($queueName, $id, $serialized)) {
29
            $this->client->pipeline(function (ClientContextInterface $client) use ($queueName, $id) {
30
                self::cReserveCommandId($client, $queueName, $id);
31
                self::cUpdateCommandStatus($client, $queueName, $id, self::STATUS_QUEUED);
32
                $client->lpush(":{$queueName}:queue", [$id]);
33
            });
34
        }
35
    }
36
37
    public function awaitCommand(string $queueName, int $timeout = null): ReceivedCommand
38
    {
39
        $stopwatchStart = time();
40
        $this->client->ping();
41
        try {
42
            $id = $this->client->brpoplpush(":{$queueName}:queue", ":{$queueName}:consuming", $timeout ?? 0);
43
        } catch (ConnectionException $e) {
44
            $id = null;
45
        }
46
        if (!$id) {
47
            if ($timeout !== null) {
48
                $timeout = max(0, time() - $stopwatchStart - $timeout);
49
                if ($timeout <= 0) {
50
                    throw new TimeoutException;
51
                }
52
            }
53
            return $this->awaitCommand($queueName, $timeout);
54
        }
55
        /* @var $id string */
56
        list(, $serialized) = $this->client->pipeline(function (ClientContextInterface $client) use ($queueName, $id) {
57
            self::cUpdateCommandStatus($client, $queueName, $id, self::STATUS_IN_PROGRESS);
58
            self::cRetrieveCommand($client, $queueName, $id);
59
            self::cReleaseReservedCommandIds($client, $queueName, [$id]);
60
        });
61
        return new ReceivedCommand($queueName, $id, $serialized);
62
    }
63
64
    public function getCommandStatus(string $queueName, string $id): string
65
    {
66
        return $this->client->hget(":{$queueName}:command_status", $id) ?? self::STATUS_NOT_FOUND;
67
    }
68
69
    public function setCommandCompleted(string $queueName, string $id)
70
    {
71
        $this->client->pipeline(function (ClientContextInterface $client) use ($queueName, $id) {
72
            self::cEndCommand($client, $queueName, $id, self::STATUS_COMPLETED);
73
        });
74
    }
75
76
    public function setCommandFailed(string $queueName, string $id)
77
    {
78
        $this->client->pipeline(function (ClientContextInterface $client) use ($queueName, $id) {
79
            self::cEndCommand($client, $queueName, $id, self::STATUS_FAILED);
80
        });
81
    }
82
83
    public function putQueue(string $queueName)
84
    {
85
        $this->client->pipeline(function (ClientContextInterface $client) use ($queueName) {
86
            self::cAddQueue($client, $queueName);
87
        });
88
    }
89
90
    public function getQueueNames(): array
91
    {
92
        return $this->client->smembers(':queues');
93
    }
94
95
    public function getQueuedCount(string $queueName): int
96
    {
97
        return $this->client->llen(":{$queueName}:queue");
98
    }
99
100
    public function getQueuedIds(string $queueName, int $offset = 0, int $limit = 10): array
101
    {
102
        return $this->client->lrange(":{$queueName}:queue", $offset, $limit);
103
    }
104
105
    public function getConsumingCount(string $queueName): int
106
    {
107
        return $this->client->llen(":{$queueName}:consuming");
108
    }
109
110
    public function getConsumingIds(string $queueName, int $offset = 0, int $limit = 10): array
111
    {
112
        return $this->client->lrange(":{$queueName}:consuming", $offset, $limit);
113
    }
114
115
    public function readCommand(string $queueName, string $id): string
116
    {
117
        $serialized = self::cRetrieveCommand($this->client, $queueName, $id);
118
        if ($serialized === null) {
119
            throw new CommandNotFoundException();
120
        }
121
        return $serialized;
122
    }
123
124
    public function emptyQueue(string $queueName)
125
    {
126
        self::cEmptyQueue($this->client, $queueName);
127
    }
128
129
    public function deleteQueue(string $queueName)
130
    {
131
        $this->client->pipeline(function (ClientContextInterface $client) use ($queueName) {
132
            self::cDeleteQueue($client, $queueName);
133
        });
134
    }
135
136
    public function purgeCommand(string $queueName, string $id)
137
    {
138
        $this->client->pipeline(function (ClientContextInterface $client) use ($queueName, $id) {
139
            $client->hdel(":{$queueName}:command_store", [$id]);
140
            $client->hdel(":{$queueName}:command_status", [$id]);
141
            self::cReleaseReservedCommandIds($client, $queueName, [$id]);
142
            $json = json_encode([$queueName, $id]);
143
            $client->lrem(":{$queueName}:queue", 1, $id);
144
            $client->lrem(":{$queueName}:consuming", 1, $id);
145
            $client->zrem(':schedule', $json);
146
        });
147
    }
148
149
    public function scheduleCommand(string $queueName, string $id, string $serialized, \DateTime $dateTime)
150
    {
151
        if (!$this->storeCommandAndCheckReservationStatus($queueName, $id, $serialized)) {
152
            $this->client->pipeline(
153
                function (ClientContextInterface $client) use ($queueName, $id, $dateTime) {
154
                    self::cReserveCommandId($client, $queueName, $id);
155
                    self::cUpdateCommandStatus($client, $queueName, $id, self::STATUS_SCHEDULED);
156
                    $json = json_encode([$queueName, $id]);
157
                    $client->zadd(':schedule', [$json => $dateTime->getTimestamp()]);
158
                }
159
            );
160
        }
161
    }
162
163
    public function cancelScheduledCommand(string $queueName, string $id)
164
    {
165
        $this->purgeCommand($queueName, $id);
166
    }
167
168
    public function clearSchedule(array $queueNames = null, \DateTime $start = null, \DateTime $end = null)
169
    {
170
        $lowScore = $start ? $start->getTimestamp() : 0;
171
        $highScore = $end ? $end->getTimestamp() : -1;
172
        if ($queueNames === null) {
173
            $queueNames = $this->getQueueNames();
174
        }
175
        foreach ($queueNames as $queueName) {
176
            $result = $this->client->zrangebyscore(':schedule', $lowScore, $highScore);
177
            if (!empty($result)) {
178
                $this->client->pipeline(function (ClientContextInterface $client) use ($result, $queueName) {
179
                    $idsToRelease = [];
180
                    foreach ($result as $json => $score) {
181
                        list($thisQueueName, $id) = json_decode($json, true);
182
                        if ($thisQueueName === $queueName) {
183
                            $client->zrem(':schedule', $json);
184
                            $idsToRelease[] = $id;
185
                        }
186
                    }
187
                    self::cReleaseReservedCommandIds($client, $queueName, $idsToRelease);
188
                });
189
            }
190
        }
191
    }
192
193
    public function receiveDueCommands(
194
        \DateTime $now,
195
        int $limit = SchedulerWorker::DEFAULT_THROTTLE,
196
        \DateTime $startTime = null
197
    ): array {
198
        if ($startTime === null) {
199
            $start = 0;
200
        } else {
201
            $start = $startTime->getTimestamp();
202
        }
203
        $result = $this->client->zrangebyscore(':schedule', $start, $now->getTimestamp(), [
204
            'limit' => [0, $limit],
205
            'withscores' => true,
206
        ]);
207
        $commands = [];
208
        if ($result !== []) {
209
            $queueNamesById = $idsByJson = [];
210
            $pipelineReturn = $this->client->pipeline(
211
                function (ClientContextInterface $client) use ($result, &$queueNamesById, &$idsByJson) {
212
                    $idsByQueueName = [];
213
                    foreach ($result as $json => $score) {
214
                        list($queueName, $id) = json_decode($json, true);
215
                        self::cRetrieveCommand($client, $queueName, $id);
216
                        $idsByQueueName[$queueName][] = $id;
217
                        $queueNamesById[$id] = $queueName;
218
                        $idsByJson[$json] = $id;
219
                    }
220
                    $client->zrem(':schedule', array_keys($result));
221
                    foreach ($idsByQueueName as $queueName => $ids) {
222
                        self::cReleaseReservedCommandIds($client, $queueName, $ids);
223
                    }
224
                }
225
            );
226
            foreach (array_keys($result) as $index => $json) {
227
                $id = $idsByJson[$json];
228
                $commands[] = new ReceivedScheduledCommand(
229
                    $queueNamesById[$id],
230
                    $id,
231
                    $pipelineReturn[$index],
232
                    new \DateTime('@' . $result[$json])
233
                );
234
            }
235
        }
236
        return $commands;
237
    }
238
239
    private function storeCommandAndCheckReservationStatus(string $queueName, string $id, string $serialized): bool
240
    {
241
        list ($isReserved) = $this->client->pipeline(
242
            function (ClientContextInterface $client) use ($queueName, $id, $serialized) {
243
                self::cIsCommandIdReserved($client, $queueName, $id);
244
                self::cStoreCommand($client, $queueName, $id, $serialized);
245
            }
246
        );
247
        return $isReserved;
248
    }
249
250
    private static function cStoreCommand($client, string $queueName, string $id, string $serialized)
251
    {
252
        self::cAddQueue($client, $queueName);
253
        $client->hset(":{$queueName}:command_store", $id, $serialized);
254
    }
255
256
    private static function cRetrieveCommand($client, string $queueName, string $id)
257
    {
258
        return $client->hget(":{$queueName}:command_store", $id);
259
    }
260
261
    private static function cReserveCommandId($client, string $queueName, string $id)
262
    {
263
        $client->sadd(":{$queueName}:queue_ids", [$id]);
264
    }
265
266
    private static function cEndCommand($client, string $queueName, string $id, string $status)
267
    {
268
        self::cUpdateCommandStatus($client, $queueName, $id, $status);
269
        self::cReleaseReservedCommandIds($client, $queueName, [$id]);
270
        $client->srem(":{$queueName}:queue_ids", [$id]);
271
        $client->lrem(":{$queueName}:consuming", 1, $id);
272
    }
273
274
    private static function cReleaseReservedCommandIds($client, string $queueName, array $ids)
275
    {
276
        $client->srem(":{$queueName}:queue_ids", $ids);
277
    }
278
279
    private static function cIsCommandIdReserved($client, string $queueName, string $id)
280
    {
281
        return $client->sismember(":{$queueName}:queue_ids", $id);
282
    }
283
284
    private static function cUpdateCommandStatus($client, string $queueName, string $id, string $status)
285
    {
286
        $client->hset(":{$queueName}:command_status", $id, $status);
287
    }
288
289
    private static function cAddQueue($client, string $queueName)
290
    {
291
        $client->sadd(':queues', [$queueName]);
292
    }
293
294
    private static function cEmptyQueue($client, string $queueName)
295
    {
296
        $client->del([
297
            ":{$queueName}:queue",
298
            ":{$queueName}:consuming",
299
            ":{$queueName}:command_store",
300
            ":{$queueName}:command_status",
301
            ":{$queueName}:queue_ids"
302
        ]);
303
    }
304
305
    private static function cDeleteQueue($client, string $queueName)
306
    {
307
        self::cEmptyQueue($client, $queueName);
308
        $client->srem(':queues', $queueName);
309
    }
310
}
311