Passed
Push — master ( d4f709...8319cd )
by Mike
07:40 queued 01:16
created

RedisDriver::getConsumingIds()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 0
Metric Value
dl 0
loc 4
ccs 2
cts 2
cp 1
rs 10
c 0
b 0
f 0
cc 1
eloc 2
nc 1
nop 1
crap 1
1
<?php
2
3
namespace MGDigital\BusQue\Redis;
4
5
use MGDigital\BusQue\Exception\CommandNotFoundException;
6
use MGDigital\BusQue\Exception\RedisException;
7
use MGDigital\BusQue\Exception\TimeoutException;
8
use MGDigital\BusQue\QueueDriverInterface;
9
use MGDigital\BusQue\ReceivedCommand;
10
use MGDigital\BusQue\ReceivedScheduledCommand;
11
use MGDigital\BusQue\SchedulerDriverInterface;
12
use MGDigital\BusQue\SchedulerWorker;
13
14
final class RedisDriver implements QueueDriverInterface, SchedulerDriverInterface
15
{
16
17
    const LUA_PATH = __DIR__ . '/../../lua';
18
19
    private $adapter;
20
    private $namespace;
21
22 19
    public function __construct(RedisAdapterInterface $adapter, string $namespace = '')
23
    {
24 19
        $this->adapter = $adapter;
25 19
        if (!preg_match('/^[a-z0-9_\-]*$/i', $namespace)) {
26
            throw new \InvalidArgumentException('Invalid namespace.');
27
        }
28 19
        $this->namespace = $namespace;
29 19
    }
30
31 1
    public function queueCommand(string $queueName, string $commandId, string $serialized)
32
    {
33 1
        $this->evalScript('queue_message', [$this->namespace, $queueName, $commandId, $serialized]);
34 1
    }
35
36 1
    public function awaitCommand(string $queueName, int $timeout = null): ReceivedCommand
37
    {
38 1
        $stopwatchStart = time();
39 1
        $this->adapter->ping();
40
        try {
41 1
            $id = $this->adapter->bRPopLPush(
42 1
                "{$this->namespace}:{$queueName}:queue",
43 1
                "{$this->namespace}:{$queueName}:receiving",
44 1
                $timeout ?? 0
45
            );
46
        } catch (RedisException $e) {
47
            $id = null;
48
        }
49 1
        if (!empty($id)) {
50 1
            $serialized = $this->evalScript('receive_message', [$this->namespace, $queueName, $id]);
51 1
            if (!empty($serialized)) {
52 1
                return new ReceivedCommand($queueName, $id, $serialized);
53
            }
54
        }
55
        if ($timeout !== null) {
56
            $timeout = time() - $stopwatchStart - $timeout;
57
            if ($timeout <= 0) {
58
                throw new TimeoutException;
59
            }
60
        }
61
        return $this->awaitCommand($queueName, $timeout);
62
    }
63
64 1
    public function completeCommand(string $queueName, string $id)
65
    {
66 1
        $this->evalScript('acknowledge_message', [$this->namespace, $queueName, $id]);
67 1
    }
68
69 1
    public function putQueue(string $queueName)
70
    {
71 1
        $this->adapter->sAdd("{$this->namespace}:queues", [ $queueName ]);
72 1
    }
73
74 1
    public function getQueueNames(): array
75
    {
76 1
        return $this->adapter->sMembers("{$this->namespace}:queues");
77
    }
78
79 1
    public function getQueuedCount(string $queueName): int
80
    {
81 1
        return $this->adapter->lLen("{$this->namespace}:{$queueName}:queue");
82
    }
83
84 1
    public function isIdQueued(string $queueName, string $id): bool
85
    {
86 1
        return $this->adapter->sIsMember("{$this->namespace}:{$queueName}:queued_ids", $id);
87
    }
88
89 1
    public function getQueuedIds(string $queueName, int $offset = 0, int $limit = 10): array
90
    {
91 1
        return $this->adapter->lRange("{$this->namespace}:{$queueName}:queue", $offset, $limit);
92
    }
93
94 1
    public function isIdConsuming(string $queueName, string $id): bool
95
    {
96 1
        return $this->adapter->sIsMember("{$this->namespace}:{$queueName}:consuming", $id);
97
    }
98
99 1
    public function getConsumingIds(string $queueName): array
100
    {
101 1
        return $this->adapter->sMembers("{$this->namespace}:{$queueName}:consuming");
102
    }
103
104
    public function isIdRejected(string $queueName, string $id): bool
105
    {
106
        return $this->adapter->sIsMember("{$this->namespace}:{$queueName}:rejected", $id);
107
    }
108
109
    public function clearRejections(string $queueName)
110
    {
111
        return $this->adapter->del("{$this->namespace}:{$queueName}:rejected");
112
    }
113
114 1
    public function readCommand(string $queueName, string $id): string
115
    {
116 1
        $serialized = $this->adapter->hGet("{$this->namespace}:{$queueName}:messages", $id);
117 1
        if ($serialized === null) {
118
            throw new CommandNotFoundException();
119
        }
120 1
        return $serialized;
121
    }
122
123 1
    public function deleteQueue(string $queueName)
124
    {
125 1
        $this->evalScript('empty_queue', [$this->namespace, $queueName]);
126 1
    }
127
128 1
    public function purgeCommand(string $queueName, string $id)
129
    {
130 1
        $this->evalScript('purge_message', [$this->namespace, $queueName, $id]);
131 1
    }
132
133 1
    public function scheduleCommand(string $queueName, string $id, string $serialized, \DateTime $dateTime)
134
    {
135 1
        $this->evalScript(
136 1
            'schedule_message',
137 1
            [$this->namespace, $queueName, $id, $serialized, $dateTime->getTimestamp()]
138
        );
139 1
    }
140
141
    /**
142
     * @param string $queueName
143
     * @param string $id
144
     * @return \DateTime|null
145
     */
146 1
    public function getScheduledTime(string $queueName, string $id)
147
    {
148 1
        $score = $this->adapter->zScore("{$this->namespace}:schedule", "$queueName||$id");
149 1
        if ($score !== null) {
150 1
            return new \DateTime("@$score");
151
        }
152
    }
153
154
    public function cancelScheduledCommand(string $queueName, string $id)
155
    {
156
        $this->purgeCommand($queueName, $id);
157
    }
158
159 1
    public function clearSchedule(array $queueNames = null, \DateTime $start = null, \DateTime $end = null)
160
    {
161 1
        if ($queueNames === null) {
162
            $queueNames = [ null ];
163
        }
164 1
        foreach ($queueNames as $queueName) {
165 1
            $this->evalScript('clear_schedule', [
166 1
                $this->namespace,
167 1
                $queueName,
168 1
                $start ? $start->getTimestamp() : '-inf',
169 1
                $end ? $end->getTimestamp() : '+inf'
170
            ]);
171
        }
172 1
    }
173
174 1
    public function receiveDueCommands(
175
        \DateTime $now,
176
        int $limit = SchedulerWorker::DEFAULT_THROTTLE,
177
        \DateTime $startTime = null
178
    ): array {
179 1
        if ($startTime === null) {
180 1
            $start = 0;
181
        } else {
182
            $start = $startTime->getTimestamp();
183
        }
184 1
        $results = $this->evalScript('receive_due_messages', [
185 1
            $this->namespace,
186 1
            $start,
187 1
            $now->getTimestamp(),
188 1
            $limit
189
        ]);
190 1
        $commands = [ ];
191 1
        foreach ($results as $result) {
192 1
            list($queueName, $id, $message, $score) = $result;
193 1
            $commands[ ] = new ReceivedScheduledCommand(
194
                $queueName,
195
                $id,
196
                $message,
197 1
                new \DateTime('@'.$score)
198
            );
199
        }
200 1
        return $commands;
201
    }
202
203 1
    public function purgeNamespace()
204
    {
205 1
        $this->evalScript('purge_namespace', [$this->namespace]);
206 1
    }
207
208 9
    private function evalScript(string $script, array $args)
209
    {
210 9
        return $this->adapter->evalScript($this->getScriptPath($script), $args);
211
    }
212
213 9
    private function getScriptPath(string $script): string
214
    {
215 9
        return self::LUA_PATH . '/' . $script . '.lua';
216
    }
217
}
218