Completed
Push — master ( beda73...774078 )
by Mike
03:24
created

PredisAdapter::cReserveCommandId()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 4
ccs 0
cts 4
cp 0
rs 10
c 0
b 0
f 0
cc 1
eloc 2
nc 1
nop 3
crap 2
1
<?php
2
3
namespace MGDigital\BusQue\Predis;
4
5
use MGDigital\BusQue\Exception\CommandNotFoundException;
6
use MGDigital\BusQue\Exception\TimeoutException;
7
use MGDigital\BusQue\QueueAdapterInterface;
8
use MGDigital\BusQue\ReceivedCommand;
9
use MGDigital\BusQue\ReceivedScheduledCommand;
10
use MGDigital\BusQue\SchedulerAdapterInterface;
11
use MGDigital\BusQue\SchedulerWorker;
12
use Predis\Client;
13
use Predis\ClientContextInterface;
14
use Predis\Connection\ConnectionException;
15
16
class PredisAdapter implements QueueAdapterInterface, SchedulerAdapterInterface
17
{
18
19
    private $client;
20
21
    public function __construct(Client $client)
22
    {
23
        $this->client = $client;
24
    }
25
26
    public function queueCommand(string $queueName, string $id, string $serialized)
27
    {
28
        if (!$this->storeCommandAndCheckIfIdReserved($queueName, $id, $serialized)) {
29
            $this->client->pipeline(function (ClientContextInterface $client) use ($queueName, $id) {
30
                self::cReserveCommandId($client, $queueName, $id);
31
                self::cUpdateCommandStatus($client, $queueName, $id, self::STATUS_QUEUED);
32
                $client->lpush(":{$queueName}:queue", [ $id ]);
33
            });
34
        }
35
    }
36
37
    public function awaitCommand(string $queueName, int $timeout = null): ReceivedCommand
38
    {
39
        $stopwatchStart = time();
40
        $this->client->ping();
41
        try {
42
            $id = $this->client->brpoplpush(":{$queueName}:queue", ":{$queueName}:consuming", $timeout ?? 0);
43
        } catch (ConnectionException $e) {
44
            $id = null;
45
        }
46
        if (!$id) {
47
            if ($timeout !== null) {
48
                $timeout = time() - $stopwatchStart - $timeout;
49
                if ($timeout <= 0) {
50
                    throw new TimeoutException;
51
                }
52
            }
53
            return $this->awaitCommand($queueName, $timeout);
54
        }
55
        /* @var $id string */
56
        list(, $serialized) = $this->client->pipeline(function (ClientContextInterface $client) use ($queueName, $id) {
57
            self::cUpdateCommandStatus($client, $queueName, $id, self::STATUS_IN_PROGRESS);
58
            self::cRetrieveCommand($client, $queueName, $id);
59
            self::cReleaseReservedCommandIds($client, $queueName, [ $id ]);
60
        });
61
        return new ReceivedCommand($queueName, $id, $serialized);
62
    }
63
64
    public function getCommandStatus(string $queueName, string $id): string
65
    {
66
        return $this->client->hget(":{$queueName}:command_status", $id) ?? self::STATUS_NOT_FOUND;
67
    }
68
69
    public function setCommandCompleted(string $queueName, string $id)
70
    {
71
        $this->client->pipeline(function (ClientContextInterface $client) use ($queueName, $id) {
72
            self::cEndCommand($client, $queueName, $id, self::STATUS_COMPLETED);
73
        });
74
    }
75
76
    public function setCommandFailed(string $queueName, string $id)
77
    {
78
        $this->client->pipeline(function (ClientContextInterface $client) use ($queueName, $id) {
79
            self::cEndCommand($client, $queueName, $id, self::STATUS_FAILED);
80
        });
81
    }
82
83
    public function putQueue(string $queueName)
84
    {
85
        $this->client->pipeline(function (ClientContextInterface $client) use ($queueName) {
86
            self::cAddQueue($client, $queueName);
87
        });
88
    }
89
90
    public function getQueueNames(): array
91
    {
92
        return $this->client->smembers(':queues');
93
    }
94
95
    public function getQueuedCount(string $queueName): int
96
    {
97
        return $this->client->llen(":{$queueName}:queue");
98
    }
99
100
    public function getQueuedIds(string $queueName, int $offset = 0, int $limit = 10): array
101
    {
102
        return $this->client->lrange(":{$queueName}:queue", $offset, $limit);
103
    }
104
105
    public function getConsumingCount(string $queueName): int
106
    {
107
        return $this->client->llen(":{$queueName}:consuming");
108
    }
109
110
    public function getConsumingIds(string $queueName, int $offset = 0, int $limit = 10): array
111
    {
112
        return $this->client->lrange(":{$queueName}:consuming", $offset, $limit);
113
    }
114
115
    public function readCommand(string $queueName, string $id): string
116
    {
117
        $serialized = self::cRetrieveCommand($this->client, $queueName, $id);
118
        if ($serialized === null) {
119
            throw new CommandNotFoundException();
120
        }
121
        return $serialized;
122
    }
123
124
    public function clearQueue(string $queueName)
125
    {
126
        self::cEmptyQueue($this->client, $queueName);
127
    }
128
129
    public function deleteQueue(string $queueName)
130
    {
131
        $this->client->pipeline(function (ClientContextInterface $client) use ($queueName) {
132
            self::cDeleteQueue($client, $queueName);
133
        });
134
        $this->clearSchedule([$queueName]);
135
    }
136
137
    public function purgeCommand(string $queueName, string $id)
138
    {
139
        $this->client->pipeline(function (ClientContextInterface $client) use ($queueName, $id) {
140
            $client->hdel(":{$queueName}:command_store", [ $id ]);
141
            $client->hdel(":{$queueName}:command_status", [ $id ]);
142
            self::cReleaseReservedCommandIds($client, $queueName, [ $id ]);
143
            $json = json_encode([ $queueName, $id ]);
144
            $client->lrem(":{$queueName}:queue", 1, $id);
145
            $client->lrem(":{$queueName}:consuming", 1, $id);
146
            $client->zrem(':schedule', $json);
147
        });
148
    }
149
150
    public function scheduleCommand(string $queueName, string $id, string $serialized, \DateTime $dateTime)
151
    {
152
        if (!$this->storeCommandAndCheckIfIdReserved($queueName, $id, $serialized)) {
153
            $this->client->pipeline(
154
                function (ClientContextInterface $client) use ($queueName, $id, $dateTime) {
155
                    self::cReserveCommandId($client, $queueName, $id);
156
                    self::cUpdateCommandStatus($client, $queueName, $id, self::STATUS_SCHEDULED);
157
                    $json = json_encode([ $queueName, $id ]);
158
                    $client->zadd(':schedule', [ $json => $dateTime->getTimestamp() ]);
159
                }
160
            );
161
        }
162
    }
163
164
    public function cancelScheduledCommand(string $queueName, string $id)
165
    {
166
        $this->purgeCommand($queueName, $id);
167
    }
168
169
    public function clearSchedule(array $queueNames = null, \DateTime $start = null, \DateTime $end = null)
170
    {
171
        $result = $this->client->zrangebyscore(
172
            ':schedule',
173
            $start ? $start->getTimestamp() : '-inf',
174
            $end ? $end->getTimestamp() : '+inf'
175
        );
176
        if (!empty($result)) {
177
            $this->client->pipeline(function (ClientContextInterface $client) use ($result, $queueNames) {
178
                $idsByQueue = [ ];
179
                foreach ($result as $json) {
180
                    list($thisQueueName, $id) = json_decode($json, true);
181
                    if ($queueNames === null || in_array($thisQueueName, $queueNames, true)) {
182
                        $client->zrem(':schedule', [ $json ]);
183
                        $idsByQueue[ $thisQueueName ][ ] = $id;
184
                    }
185
                }
186
                foreach ($idsByQueue as $queueName => $ids) {
187
                    self::cReleaseReservedCommandIds($client, $queueName, $ids);
188
                }
189
            });
190
        }
191
    }
192
193
    public function receiveDueCommands(
194
        \DateTime $now,
195
        int $limit = SchedulerWorker::DEFAULT_THROTTLE,
196
        \DateTime $startTime = null
197
    ): array {
198
        if ($startTime === null) {
199
            $start = 0;
200
        } else {
201
            $start = $startTime->getTimestamp();
202
        }
203
        $result = $this->client->zrangebyscore(':schedule', $start, $now->getTimestamp(), [
204
            'limit' => [ 0, $limit ],
205
            'withscores' => true,
206
        ]);
207
        $commands = [ ];
208
        if ($result !== [ ]) {
209
            $queueNamesById = $idsByJson = [ ];
210
            $pipelineReturn = $this->client->pipeline(
211
                function (ClientContextInterface $client) use ($result, &$queueNamesById, &$idsByJson) {
212
                    $idsByQueueName = [ ];
213
                    foreach ($result as $json => $score) {
214
                        list($queueName, $id) = json_decode($json, true);
215
                        self::cRetrieveCommand($client, $queueName, $id);
216
                        $idsByQueueName[ $queueName ][ ] = $id;
217
                        $queueNamesById[ $id ] = $queueName;
218
                        $idsByJson[ $json ] = $id;
219
                    }
220
                    $client->zrem(':schedule', array_keys($result));
221
                    foreach ($idsByQueueName as $queueName => $ids) {
222
                        self::cReleaseReservedCommandIds($client, $queueName, $ids);
223
                    }
224
                }
225
            );
226
            foreach (array_keys($result) as $index => $json) {
227
                $id = $idsByJson[ $json ];
228
                $commands[ ] = new ReceivedScheduledCommand(
229
                    $queueNamesById[ $id ],
230
                    $id,
231
                    $pipelineReturn[ $index ],
232
                    new \DateTime('@'.$result[ $json ])
233
                );
234
            }
235
        }
236
        return $commands;
237
    }
238
239
    private function storeCommandAndCheckIfIdReserved(string $queueName, string $id, string $serialized): bool
240
    {
241
        list ($isReserved) = $this->client->pipeline(
242
            function (ClientContextInterface $client) use ($queueName, $id, $serialized) {
243
                $client->sismember(":{$queueName}:queue_ids", $id);
244
                $client->hset(":{$queueName}:command_store", $id, $serialized);
245
                self::cAddQueue($client, $queueName);
246
            }
247
        );
248
        return $isReserved;
249
    }
250
251
    private static function cRetrieveCommand($client, string $queueName, string $id)
252
    {
253
        return $client->hget(":{$queueName}:command_store", $id);
254
    }
255
256
    private static function cReserveCommandId($client, string $queueName, string $id)
257
    {
258
        $client->sadd(":{$queueName}:queue_ids", [ $id ]);
259
    }
260
261
    private static function cEndCommand($client, string $queueName, string $id, string $status)
262
    {
263
        self::cUpdateCommandStatus($client, $queueName, $id, $status);
264
        self::cReleaseReservedCommandIds($client, $queueName, [ $id ]);
265
        $client->srem(":{$queueName}:queue_ids", [ $id ]);
266
        $client->lrem(":{$queueName}:consuming", 1, $id);
267
    }
268
269
    private static function cReleaseReservedCommandIds($client, string $queueName, array $ids)
270
    {
271
        $client->srem(":{$queueName}:queue_ids", $ids);
272
    }
273
274
    private static function cUpdateCommandStatus($client, string $queueName, string $id, string $status)
275
    {
276
        $client->hset(":{$queueName}:command_status", $id, $status);
277
    }
278
279
    private static function cAddQueue($client, string $queueName)
280
    {
281
        $client->sadd(':queues', [ $queueName ]);
282
    }
283
284
    private static function cEmptyQueue($client, string $queueName)
285
    {
286
        $client->del([
287
            ":{$queueName}:queue",
288
            ":{$queueName}:consuming",
289
            ":{$queueName}:command_status",
290
            ":{$queueName}:queue_ids"
291
        ]);
292
    }
293
294
    private static function cDeleteQueue($client, string $queueName)
295
    {
296
        self::cEmptyQueue($client, $queueName);
297
        $client->srem(':queues', [ $queueName ]);
298
    }
299
}
300