Passed
Branch master (a01aa4)
by Mike
03:32
created

PredisAdapter::clearSchedule()   D

Complexity

Conditions 9
Paths 24

Size

Total Lines 26
Code Lines 17

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 90

Importance

Changes 0
Metric Value
dl 0
loc 26
ccs 0
cts 25
cp 0
rs 4.909
c 0
b 0
f 0
cc 9
eloc 17
nc 24
nop 3
crap 90
1
<?php
2
3
namespace MGDigital\BusQue\Predis;
4
5
use MGDigital\BusQue\Exception\CommandNotFoundException;
6
use MGDigital\BusQue\Exception\TimeoutException;
7
use MGDigital\BusQue\QueueAdapterInterface;
8
use MGDigital\BusQue\ReceivedCommand;
9
use MGDigital\BusQue\ReceivedScheduledCommand;
10
use MGDigital\BusQue\SchedulerAdapterInterface;
11
use MGDigital\BusQue\SchedulerWorker;
12
use Predis\Client;
13
use Predis\ClientContextInterface;
14
use Predis\Connection\ConnectionException;
15
16
class PredisAdapter implements QueueAdapterInterface, SchedulerAdapterInterface
17
{
18
19
    private $client;
20
21
    public function __construct(Client $client)
22
    {
23
        $this->client = $client;
24
    }
25
26
    public function queueCommand(string $queueName, string $id, string $serialized)
27
    {
28
        if (!$this->storeCommandAndCheckIfIdReserved($queueName, $id, $serialized)) {
29 View Code Duplication
            $this->client->pipeline(function (ClientContextInterface $client) use ($queueName, $id) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
30
                self::cReserveCommandId($client, $queueName, $id);
31
                self::cUpdateCommandStatus($client, $queueName, $id, self::STATUS_QUEUED);
32
                $client->lpush(":{$queueName}:queue", [ $id ]);
33
            });
34
        }
35
    }
36
37
    public function awaitCommand(string $queueName, int $timeout = null): ReceivedCommand
38
    {
39
        $stopwatchStart = time();
40
        $this->client->ping();
41
        try {
42
            $id = $this->client->brpoplpush(":{$queueName}:queue", ":{$queueName}:consuming", $timeout ?? 0);
43
        } catch (ConnectionException $e) {
44
            $id = null;
45
        }
46
        if (!$id) {
47
            if ($timeout !== null) {
48
                $timeout = time() - $stopwatchStart - $timeout;
49
                if ($timeout <= 0) {
50
                    throw new TimeoutException;
51
                }
52
            }
53
            return $this->awaitCommand($queueName, $timeout);
54
        }
55
        /* @var $id string */
56 View Code Duplication
        list(, $serialized) = $this->client->pipeline(function (ClientContextInterface $client) use ($queueName, $id) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
57
            self::cUpdateCommandStatus($client, $queueName, $id, self::STATUS_IN_PROGRESS);
58
            self::cRetrieveCommand($client, $queueName, $id);
59
            self::cReleaseReservedCommandIds($client, $queueName, [ $id ]);
60
        });
61
        return new ReceivedCommand($queueName, $id, $serialized);
62
    }
63
64
    public function getCommandStatus(string $queueName, string $id): string
65
    {
66
        return $this->client->hget(":{$queueName}:command_status", $id) ?? self::STATUS_NOT_FOUND;
67
    }
68
69
    public function setCommandCompleted(string $queueName, string $id)
70
    {
71
        $this->client->pipeline(function (ClientContextInterface $client) use ($queueName, $id) {
72
            self::cEndCommand($client, $queueName, $id, self::STATUS_COMPLETED);
73
        });
74
    }
75
76
    public function setCommandFailed(string $queueName, string $id)
77
    {
78
        $this->client->pipeline(function (ClientContextInterface $client) use ($queueName, $id) {
79
            self::cEndCommand($client, $queueName, $id, self::STATUS_FAILED);
80
        });
81
    }
82
83
    public function putQueue(string $queueName)
84
    {
85
        $this->client->pipeline(function (ClientContextInterface $client) use ($queueName) {
86
            self::cAddQueue($client, $queueName);
87
        });
88
    }
89
90
    public function getQueueNames(): array
91
    {
92
        return $this->client->smembers(':queues');
93
    }
94
95
    public function getQueuedCount(string $queueName): int
96
    {
97
        return $this->client->llen(":{$queueName}:queue");
98
    }
99
100
    public function getQueuedIds(string $queueName, int $offset = 0, int $limit = 10): array
101
    {
102
        return $this->client->lrange(":{$queueName}:queue", $offset, $limit);
103
    }
104
105
    public function getConsumingCount(string $queueName): int
106
    {
107
        return $this->client->llen(":{$queueName}:consuming");
108
    }
109
110
    public function getConsumingIds(string $queueName, int $offset = 0, int $limit = 10): array
111
    {
112
        return $this->client->lrange(":{$queueName}:consuming", $offset, $limit);
113
    }
114
115
    public function readCommand(string $queueName, string $id): string
116
    {
117
        $serialized = self::cRetrieveCommand($this->client, $queueName, $id);
118
        if ($serialized === null) {
119
            throw new CommandNotFoundException();
120
        }
121
        return $serialized;
122
    }
123
124
    public function clearQueue(string $queueName)
125
    {
126
        self::cEmptyQueue($this->client, $queueName);
127
    }
128
129
    public function deleteQueue(string $queueName)
130
    {
131
        $this->client->pipeline(function (ClientContextInterface $client) use ($queueName) {
132
            self::cDeleteQueue($client, $queueName);
133
        });
134
        $this->clearSchedule([$queueName]);
135
    }
136
137
    public function purgeCommand(string $queueName, string $id)
138
    {
139
        $this->client->pipeline(function (ClientContextInterface $client) use ($queueName, $id) {
140
            $client->hdel(":{$queueName}:command_store", [ $id ]);
141
            $client->hdel(":{$queueName}:command_status", [ $id ]);
142
            self::cReleaseReservedCommandIds($client, $queueName, [ $id ]);
143
            $json = json_encode([ $queueName, $id ]);
144
            $client->lrem(":{$queueName}:queue", 1, $id);
145
            $client->lrem(":{$queueName}:consuming", 1, $id);
146
            $client->zrem(':schedule', $json);
147
        });
148
    }
149
150
    public function scheduleCommand(string $queueName, string $id, string $serialized, \DateTime $dateTime)
151
    {
152
        if (!$this->storeCommandAndCheckIfIdReserved($queueName, $id, $serialized)) {
153
            $this->client->pipeline(
154
                function (ClientContextInterface $client) use ($queueName, $id, $dateTime) {
155
                    self::cReserveCommandId($client, $queueName, $id);
156
                    self::cUpdateCommandStatus($client, $queueName, $id, self::STATUS_SCHEDULED);
157
                    $json = json_encode([ $queueName, $id ]);
158
                    $client->zadd(':schedule', [ $json => $dateTime->getTimestamp() ]);
159
                }
160
            );
161
        }
162
    }
163
164
    public function cancelScheduledCommand(string $queueName, string $id)
165
    {
166
        $this->purgeCommand($queueName, $id);
167
    }
168
169
    public function clearSchedule(array $queueNames = null, \DateTime $start = null, \DateTime $end = null)
170
    {
171
        $lowScore = $start ? $start->getTimestamp() : '-inf';
172
        $highScore = $end ? $end->getTimestamp() : '+inf';
173
        if ($queueNames === null) {
174
            $queueNames = $this->getQueueNames();
175
        }
176
        foreach ($queueNames as $queueName) {
177
            $result = $this->client->zrangebyscore(':schedule', $lowScore, $highScore);
178
            if (!empty($result)) {
179
                $this->client->pipeline(function (ClientContextInterface $client) use ($result, $queueName) {
180
                    $idsToRelease = [ ];
181
                    foreach ($result as $json) {
182
                        list($thisQueueName, $id) = json_decode($json, true);
183
                        if ($thisQueueName === $queueName) {
184
                            $client->zrem(':schedule', [ $json ]);
185
                            $idsToRelease[ ] = $id;
186
                        }
187
                    }
188
                    if ($idsToRelease !== [ ]) {
189
                        self::cReleaseReservedCommandIds($client, $queueName, $idsToRelease);
190
                    }
191
                });
192
            }
193
        }
194
    }
195
196
    public function receiveDueCommands(
197
        \DateTime $now,
198
        int $limit = SchedulerWorker::DEFAULT_THROTTLE,
199
        \DateTime $startTime = null
200
    ): array {
201
        if ($startTime === null) {
202
            $start = 0;
203
        } else {
204
            $start = $startTime->getTimestamp();
205
        }
206
        $result = $this->client->zrangebyscore(':schedule', $start, $now->getTimestamp(), [
207
            'limit' => [ 0, $limit ],
208
            'withscores' => true,
209
        ]);
210
        $commands = [ ];
211
        if ($result !== [ ]) {
212
            $queueNamesById = $idsByJson = [ ];
213
            $pipelineReturn = $this->client->pipeline(
214
                function (ClientContextInterface $client) use ($result, &$queueNamesById, &$idsByJson) {
215
                    $idsByQueueName = [ ];
216
                    foreach ($result as $json => $score) {
217
                        list($queueName, $id) = json_decode($json, true);
218
                        self::cRetrieveCommand($client, $queueName, $id);
219
                        $idsByQueueName[ $queueName ][ ] = $id;
220
                        $queueNamesById[ $id ] = $queueName;
221
                        $idsByJson[ $json ] = $id;
222
                    }
223
                    $client->zrem(':schedule', array_keys($result));
224
                    foreach ($idsByQueueName as $queueName => $ids) {
225
                        self::cReleaseReservedCommandIds($client, $queueName, $ids);
226
                    }
227
                }
228
            );
229
            foreach (array_keys($result) as $index => $json) {
230
                $id = $idsByJson[ $json ];
231
                $commands[ ] = new ReceivedScheduledCommand(
232
                    $queueNamesById[ $id ],
233
                    $id,
234
                    $pipelineReturn[ $index ],
235
                    new \DateTime('@'.$result[ $json ])
236
                );
237
            }
238
        }
239
        return $commands;
240
    }
241
242
    private function storeCommandAndCheckIfIdReserved(string $queueName, string $id, string $serialized): bool
243
    {
244
        list ($isReserved) = $this->client->pipeline(
245
            function (ClientContextInterface $client) use ($queueName, $id, $serialized) {
246
                $client->sismember(":{$queueName}:queue_ids", $id);
247
                $client->hset(":{$queueName}:command_store", $id, $serialized);
248
                self::cAddQueue($client, $queueName);
249
            }
250
        );
251
        return $isReserved;
252
    }
253
254
    private static function cRetrieveCommand($client, string $queueName, string $id)
255
    {
256
        return $client->hget(":{$queueName}:command_store", $id);
257
    }
258
259
    private static function cReserveCommandId($client, string $queueName, string $id)
260
    {
261
        $client->sadd(":{$queueName}:queue_ids", [ $id ]);
262
    }
263
264
    private static function cEndCommand($client, string $queueName, string $id, string $status)
265
    {
266
        self::cUpdateCommandStatus($client, $queueName, $id, $status);
267
        self::cReleaseReservedCommandIds($client, $queueName, [ $id ]);
268
        $client->srem(":{$queueName}:queue_ids", [ $id ]);
269
        $client->lrem(":{$queueName}:consuming", 1, $id);
270
    }
271
272
    private static function cReleaseReservedCommandIds($client, string $queueName, array $ids)
273
    {
274
        $client->srem(":{$queueName}:queue_ids", $ids);
275
    }
276
277
    private static function cUpdateCommandStatus($client, string $queueName, string $id, string $status)
278
    {
279
        $client->hset(":{$queueName}:command_status", $id, $status);
280
    }
281
282
    private static function cAddQueue($client, string $queueName)
283
    {
284
        $client->sadd(':queues', [ $queueName ]);
285
    }
286
287
    private static function cEmptyQueue($client, string $queueName)
288
    {
289
        $client->del([
290
            ":{$queueName}:queue",
291
            ":{$queueName}:consuming",
292
            ":{$queueName}:command_status",
293
            ":{$queueName}:queue_ids"
294
        ]);
295
    }
296
297
    private static function cDeleteQueue($client, string $queueName)
298
    {
299
        self::cEmptyQueue($client, $queueName);
300
        $client->srem(':queues', [ $queueName ]);
301
    }
302
}
303