Completed
Push — master ( a01aa4...0d0e7a )
by Mike
02:47
created

PredisAdapter::setCommandCompleted()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 6
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 6
ccs 0
cts 5
cp 0
rs 9.4285
c 0
b 0
f 0
cc 1
eloc 3
nc 1
nop 2
crap 2
1
<?php
2
3
namespace MGDigital\BusQue\Predis;
4
5
use MGDigital\BusQue\Exception\CommandNotFoundException;
6
use MGDigital\BusQue\Exception\TimeoutException;
7
use MGDigital\BusQue\QueueAdapterInterface;
8
use MGDigital\BusQue\ReceivedCommand;
9
use MGDigital\BusQue\ReceivedScheduledCommand;
10
use MGDigital\BusQue\SchedulerAdapterInterface;
11
use MGDigital\BusQue\SchedulerWorker;
12
use Predis\Client;
13
use Predis\ClientContextInterface;
14
use Predis\Connection\ConnectionException;
15
16
class PredisAdapter implements QueueAdapterInterface, SchedulerAdapterInterface
17
{
18
19
    private $client;
20
21
    public function __construct(Client $client)
22
    {
23
        $this->client = $client;
24
    }
25
26
    public function queueCommand(string $queueName, string $id, string $serialized)
27
    {
28
        if (!$this->storeCommandAndCheckIfIdReserved($queueName, $id, $serialized)) {
29
            $this->client->pipeline(function (ClientContextInterface $client) use ($queueName, $id) {
30
                self::cReserveCommandId($client, $queueName, $id);
31
                self::cUpdateCommandStatus($client, $queueName, $id, self::STATUS_QUEUED);
32
                $client->lpush(":{$queueName}:queue", [ $id ]);
33
            });
34
        }
35
    }
36
37
    public function awaitCommand(string $queueName, int $timeout = null): ReceivedCommand
38
    {
39
        $stopwatchStart = time();
40
        $this->client->ping();
41
        try {
42
            $id = $this->client->brpoplpush(":{$queueName}:queue", ":{$queueName}:consuming", $timeout ?? 0);
43
        } catch (ConnectionException $e) {
44
            $id = null;
45
        }
46
        if (!$id) {
47
            if ($timeout !== null) {
48
                $timeout = time() - $stopwatchStart - $timeout;
49
                if ($timeout <= 0) {
50
                    throw new TimeoutException;
51
                }
52
            }
53
            return $this->awaitCommand($queueName, $timeout);
54
        }
55
        /* @var $id string */
56
        list(, $serialized) = $this->client->pipeline(function (ClientContextInterface $client) use ($queueName, $id) {
57
            self::cUpdateCommandStatus($client, $queueName, $id, self::STATUS_IN_PROGRESS);
58
            self::cRetrieveCommand($client, $queueName, $id);
59
            self::cReleaseReservedCommandIds($client, $queueName, [ $id ]);
60
        });
61
        return new ReceivedCommand($queueName, $id, $serialized);
62
    }
63
64
    public function getCommandStatus(string $queueName, string $id): string
65
    {
66
        return $this->client->hget(":{$queueName}:command_status", $id) ?? self::STATUS_NOT_FOUND;
67
    }
68
69
    public function setCommandCompleted(string $queueName, string $id)
70
    {
71
        $this->client->pipeline(function (ClientContextInterface $client) use ($queueName, $id) {
72
            self::cEndCommand($client, $queueName, $id, self::STATUS_COMPLETED);
73
        });
74
    }
75
76
    public function setCommandFailed(string $queueName, string $id)
77
    {
78
        $this->client->pipeline(function (ClientContextInterface $client) use ($queueName, $id) {
79
            self::cEndCommand($client, $queueName, $id, self::STATUS_FAILED);
80
        });
81
    }
82
83
    public function putQueue(string $queueName)
84
    {
85
        $this->client->pipeline(function (ClientContextInterface $client) use ($queueName) {
86
            self::cAddQueue($client, $queueName);
87
        });
88
    }
89
90
    public function getQueueNames(): array
91
    {
92
        return $this->client->smembers(':queues');
93
    }
94
95
    public function getQueuedCount(string $queueName): int
96
    {
97
        return $this->client->llen(":{$queueName}:queue");
98
    }
99
100
    public function getQueuedIds(string $queueName, int $offset = 0, int $limit = 10): array
101
    {
102
        return $this->client->lrange(":{$queueName}:queue", $offset, $limit);
103
    }
104
105
    public function getConsumingCount(string $queueName): int
106
    {
107
        return $this->client->llen(":{$queueName}:consuming");
108
    }
109
110
    public function getConsumingIds(string $queueName, int $offset = 0, int $limit = 10): array
111
    {
112
        return $this->client->lrange(":{$queueName}:consuming", $offset, $limit);
113
    }
114
115
    public function readCommand(string $queueName, string $id): string
116
    {
117
        $serialized = self::cRetrieveCommand($this->client, $queueName, $id);
118
        if ($serialized === null) {
119
            throw new CommandNotFoundException();
120
        }
121
        return $serialized;
122
    }
123
124
    public function clearQueue(string $queueName)
125
    {
126
        self::cEmptyQueue($this->client, $queueName);
127
    }
128
129
    public function deleteQueue(string $queueName)
130
    {
131
        $this->client->pipeline(function (ClientContextInterface $client) use ($queueName) {
132
            self::cDeleteQueue($client, $queueName);
133
        });
134
        $this->clearSchedule([$queueName]);
135
    }
136
137
    public function purgeCommand(string $queueName, string $id)
138
    {
139
        $this->client->pipeline(function (ClientContextInterface $client) use ($queueName, $id) {
140
            $client->hdel(":{$queueName}:command_store", [ $id ]);
141
            $client->hdel(":{$queueName}:command_status", [ $id ]);
142
            self::cReleaseReservedCommandIds($client, $queueName, [ $id ]);
143
            $json = json_encode([ $queueName, $id ]);
144
            $client->lrem(":{$queueName}:queue", 1, $id);
145
            $client->lrem(":{$queueName}:consuming", 1, $id);
146
            $client->zrem(':schedule', $json);
147
        });
148
    }
149
150
    public function scheduleCommand(string $queueName, string $id, string $serialized, \DateTime $dateTime)
151
    {
152
        if (!$this->storeCommandAndCheckIfIdReserved($queueName, $id, $serialized)) {
153
            $this->client->pipeline(
154
                function (ClientContextInterface $client) use ($queueName, $id, $dateTime) {
155
                    self::cReserveCommandId($client, $queueName, $id);
156
                    self::cUpdateCommandStatus($client, $queueName, $id, self::STATUS_SCHEDULED);
157
                    $json = json_encode([ $queueName, $id ]);
158
                    $client->zadd(':schedule', [ $json => $dateTime->getTimestamp() ]);
159
                }
160
            );
161
        }
162
    }
163
164
    public function cancelScheduledCommand(string $queueName, string $id)
165
    {
166
        $this->purgeCommand($queueName, $id);
167
    }
168
169
    public function clearSchedule(array $queueNames = null, \DateTime $start = null, \DateTime $end = null)
170
    {
171
        $lowScore = $start ? $start->getTimestamp() : '-inf';
172
        $highScore = $end ? $end->getTimestamp() : '+inf';
173
        if ($queueNames === null) {
174
            $queueNames = $this->getQueueNames();
175
        }
176
        foreach ($queueNames as $queueName) {
177
            $this->clearScheduleForQueue($queueName, $lowScore, $highScore);
178
        }
179
    }
180
181
    private function clearScheduleForQueue(string $queueName, $lowScore, $highScore)
182
    {
183
        $result = $this->client->zrangebyscore(':schedule', $lowScore, $highScore);
184
        if (!empty($result)) {
185
            $this->client->pipeline(function (ClientContextInterface $client) use ($result, $queueName) {
186
                $idsToRelease = [ ];
187
                foreach ($result as $json) {
188
                    list($thisQueueName, $id) = json_decode($json, true);
189
                    if ($thisQueueName === $queueName) {
190
                        $client->zrem(':schedule', [ $json ]);
191
                        $idsToRelease[ ] = $id;
192
                    }
193
                }
194
                if ($idsToRelease !== [ ]) {
195
                    self::cReleaseReservedCommandIds($client, $queueName, $idsToRelease);
196
                }
197
            });
198
        }
199
    }
200
201
    public function receiveDueCommands(
202
        \DateTime $now,
203
        int $limit = SchedulerWorker::DEFAULT_THROTTLE,
204
        \DateTime $startTime = null
205
    ): array {
206
        if ($startTime === null) {
207
            $start = 0;
208
        } else {
209
            $start = $startTime->getTimestamp();
210
        }
211
        $result = $this->client->zrangebyscore(':schedule', $start, $now->getTimestamp(), [
212
            'limit' => [ 0, $limit ],
213
            'withscores' => true,
214
        ]);
215
        $commands = [ ];
216
        if ($result !== [ ]) {
217
            $queueNamesById = $idsByJson = [ ];
218
            $pipelineReturn = $this->client->pipeline(
219
                function (ClientContextInterface $client) use ($result, &$queueNamesById, &$idsByJson) {
220
                    $idsByQueueName = [ ];
221
                    foreach ($result as $json => $score) {
222
                        list($queueName, $id) = json_decode($json, true);
223
                        self::cRetrieveCommand($client, $queueName, $id);
224
                        $idsByQueueName[ $queueName ][ ] = $id;
225
                        $queueNamesById[ $id ] = $queueName;
226
                        $idsByJson[ $json ] = $id;
227
                    }
228
                    $client->zrem(':schedule', array_keys($result));
229
                    foreach ($idsByQueueName as $queueName => $ids) {
230
                        self::cReleaseReservedCommandIds($client, $queueName, $ids);
231
                    }
232
                }
233
            );
234
            foreach (array_keys($result) as $index => $json) {
235
                $id = $idsByJson[ $json ];
236
                $commands[ ] = new ReceivedScheduledCommand(
237
                    $queueNamesById[ $id ],
238
                    $id,
239
                    $pipelineReturn[ $index ],
240
                    new \DateTime('@'.$result[ $json ])
241
                );
242
            }
243
        }
244
        return $commands;
245
    }
246
247
    private function storeCommandAndCheckIfIdReserved(string $queueName, string $id, string $serialized): bool
248
    {
249
        list ($isReserved) = $this->client->pipeline(
250
            function (ClientContextInterface $client) use ($queueName, $id, $serialized) {
251
                $client->sismember(":{$queueName}:queue_ids", $id);
252
                $client->hset(":{$queueName}:command_store", $id, $serialized);
253
                self::cAddQueue($client, $queueName);
254
            }
255
        );
256
        return $isReserved;
257
    }
258
259
    private static function cRetrieveCommand($client, string $queueName, string $id)
260
    {
261
        return $client->hget(":{$queueName}:command_store", $id);
262
    }
263
264
    private static function cReserveCommandId($client, string $queueName, string $id)
265
    {
266
        $client->sadd(":{$queueName}:queue_ids", [ $id ]);
267
    }
268
269
    private static function cEndCommand($client, string $queueName, string $id, string $status)
270
    {
271
        self::cUpdateCommandStatus($client, $queueName, $id, $status);
272
        self::cReleaseReservedCommandIds($client, $queueName, [ $id ]);
273
        $client->srem(":{$queueName}:queue_ids", [ $id ]);
274
        $client->lrem(":{$queueName}:consuming", 1, $id);
275
    }
276
277
    private static function cReleaseReservedCommandIds($client, string $queueName, array $ids)
278
    {
279
        $client->srem(":{$queueName}:queue_ids", $ids);
280
    }
281
282
    private static function cUpdateCommandStatus($client, string $queueName, string $id, string $status)
283
    {
284
        $client->hset(":{$queueName}:command_status", $id, $status);
285
    }
286
287
    private static function cAddQueue($client, string $queueName)
288
    {
289
        $client->sadd(':queues', [ $queueName ]);
290
    }
291
292
    private static function cEmptyQueue($client, string $queueName)
293
    {
294
        $client->del([
295
            ":{$queueName}:queue",
296
            ":{$queueName}:consuming",
297
            ":{$queueName}:command_status",
298
            ":{$queueName}:queue_ids"
299
        ]);
300
    }
301
302
    private static function cDeleteQueue($client, string $queueName)
303
    {
304
        self::cEmptyQueue($client, $queueName);
305
        $client->srem(':queues', [ $queueName ]);
306
    }
307
}
308