Completed
Push — master ( 104680...55a46d )
by Mike
03:18
created

RedisDriver   A

Complexity

Total Complexity 35

Size/Duplication

Total Lines 191
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 5

Test Coverage

Coverage 85.15%

Importance

Changes 0
Metric Value
wmc 35
lcom 1
cbo 5
dl 0
loc 191
ccs 86
cts 101
cp 0.8515
rs 9
c 0
b 0
f 0

21 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 8 2
A queueCommand() 0 4 1
B awaitCommand() 0 27 6
A completeCommand() 0 4 1
A putQueue() 0 4 1
A getQueueNames() 0 4 1
A getQueuedCount() 0 4 1
A isIdQueued() 0 4 1
A getQueuedIds() 0 4 1
A isIdConsuming() 0 4 1
A getConsumingIds() 0 4 1
A readCommand() 0 8 2
A deleteQueue() 0 4 1
A purgeCommand() 0 4 1
A scheduleCommand() 0 7 1
A getScheduledTime() 0 7 2
A cancelScheduledCommand() 0 4 1
B clearSchedule() 0 14 5
B receiveDueCommands() 0 28 3
A purgeNamespace() 0 4 1
A evalScript() 0 6 1
1
<?php
2
3
namespace MGDigital\BusQue\Redis;
4
5
use MGDigital\BusQue\Exception\CommandNotFoundException;
6
use MGDigital\BusQue\Exception\RedisException;
7
use MGDigital\BusQue\Exception\TimeoutException;
8
use MGDigital\BusQue\QueueDriverInterface;
9
use MGDigital\BusQue\ReceivedCommand;
10
use MGDigital\BusQue\ReceivedScheduledCommand;
11
use MGDigital\BusQue\SchedulerDriverInterface;
12
use MGDigital\BusQue\SchedulerWorker;
13
14
final class RedisDriver implements QueueDriverInterface, SchedulerDriverInterface
15
{
16
17
    const LUA_PATH = __DIR__ . '/../../lua';
18
19
    private $adapter;
20
    private $namespace;
21
22 19
    public function __construct(RedisAdapterInterface $adapter, string $namespace = '')
23
    {
24 19
        $this->adapter = $adapter;
25 19
        if (!preg_match('/^[a-z0-9_\-]*$/i', $namespace)) {
26
            throw new \InvalidArgumentException('Invalid namespace.');
27
        }
28 19
        $this->namespace = $namespace;
29 19
    }
30
31 1
    public function queueCommand(string $queueName, string $commandId, string $serialized)
32
    {
33 1
        $this->evalScript('queue_message', [$this->namespace, $queueName, $commandId, $serialized]);
34 1
    }
35
36 1
    public function awaitCommand(string $queueName, int $timeout = null): ReceivedCommand
37
    {
38 1
        $stopwatchStart = time();
39 1
        $this->adapter->ping();
40
        try {
41 1
            $id = $this->adapter->bRPopLPush(
42 1
                "{$this->namespace}:{$queueName}:queue",
43 1
                "{$this->namespace}:{$queueName}:receiving",
44 1
                $timeout ?? 0
45
            );
46
        } catch (RedisException $e) {
47
            $id = null;
48
        }
49 1
        if (!empty($id)) {
50 1
            $serialized = $this->evalScript('receive_message', [$this->namespace, $queueName, $id]);
51 1
            if (!empty($serialized)) {
52 1
                return new ReceivedCommand($queueName, $id, $serialized);
53
            }
54
        }
55
        if ($timeout !== null) {
56
            $timeout = time() - $stopwatchStart - $timeout;
57
            if ($timeout <= 0) {
58
                throw new TimeoutException;
59
            }
60
        }
61
        return $this->awaitCommand($queueName, $timeout);
62
    }
63
64 1
    public function completeCommand(string $queueName, string $id)
65
    {
66 1
        $this->adapter->sRem("{$this->namespace}:{$queueName}:consuming", [ $id ]);
67 1
    }
68
69 1
    public function putQueue(string $queueName)
70
    {
71 1
        $this->adapter->sAdd("{$this->namespace}:queues", [ $queueName ]);
72 1
    }
73
74 1
    public function getQueueNames(): array
75
    {
76 1
        return $this->adapter->sMembers("{$this->namespace}:queues");
77
    }
78
79 1
    public function getQueuedCount(string $queueName): int
80
    {
81 1
        return $this->adapter->lLen("{$this->namespace}:{$queueName}:queue");
82
    }
83
84 1
    public function isIdQueued(string $queueName, string $id): bool
85
    {
86 1
        return $this->adapter->sIsMember("{$this->namespace}:{$queueName}:queued_ids", $id);
87
    }
88
89 1
    public function getQueuedIds(string $queueName, int $offset = 0, int $limit = 10): array
90
    {
91 1
        return $this->adapter->lRange("{$this->namespace}:{$queueName}:queue", $offset, $limit);
92
    }
93
94 1
    public function isIdConsuming(string $queueName, string $id): bool
95
    {
96 1
        return $this->adapter->sIsMember("{$this->namespace}:{$queueName}:consuming", $id);
97
    }
98
99 1
    public function getConsumingIds(string $queueName): array
100
    {
101 1
        return $this->adapter->sMembers("{$this->namespace}:{$queueName}:consuming");
102
    }
103
104 1
    public function readCommand(string $queueName, string $id): string
105
    {
106 1
        $serialized = $this->adapter->hGet("{$this->namespace}:{$queueName}:messages", $id);
107 1
        if ($serialized === null) {
108
            throw new CommandNotFoundException();
109
        }
110 1
        return $serialized;
111
    }
112
113 1
    public function deleteQueue(string $queueName)
114
    {
115 1
        $this->evalScript('empty_queue', [$this->namespace, $queueName]);
116 1
    }
117
118 1
    public function purgeCommand(string $queueName, string $id)
119
    {
120 1
        $this->evalScript('purge_message', [$this->namespace, $queueName, $id]);
121 1
    }
122
123 1
    public function scheduleCommand(string $queueName, string $id, string $serialized, \DateTime $dateTime)
124
    {
125 1
        $this->evalScript(
126 1
            'schedule_message',
127 1
            [$this->namespace, $queueName, $id, $serialized, $dateTime->getTimestamp()]
128
        );
129 1
    }
130
131
    /**
132
     * @param string $queueName
133
     * @param string $id
134
     * @return \DateTime|null
135
     */
136 1
    public function getScheduledTime(string $queueName, string $id)
137
    {
138 1
        $score = $this->adapter->zScore("{$this->namespace}:schedule", "$queueName||$id");
139 1
        if ($score !== null) {
140 1
            return new \DateTime("@$score");
141
        }
142
    }
143
144
    public function cancelScheduledCommand(string $queueName, string $id)
145
    {
146
        $this->purgeCommand($queueName, $id);
147
    }
148
149 1
    public function clearSchedule(array $queueNames = null, \DateTime $start = null, \DateTime $end = null)
150
    {
151 1
        if ($queueNames === null) {
152
            $queueNames = [ null ];
153
        }
154 1
        foreach ($queueNames as $queueName) {
155 1
            $this->evalScript('clear_schedule', [
156 1
                $this->namespace,
157 1
                $queueName,
158 1
                $start ? $start->getTimestamp() : '-inf',
159 1
                $end ? $end->getTimestamp() : '+inf'
160
            ]);
161
        }
162 1
    }
163
164 1
    public function receiveDueCommands(
165
        \DateTime $now,
166
        int $limit = SchedulerWorker::DEFAULT_THROTTLE,
167
        \DateTime $startTime = null
168
    ): array {
169 1
        if ($startTime === null) {
170 1
            $start = 0;
171
        } else {
172
            $start = $startTime->getTimestamp();
173
        }
174 1
        $results = $this->evalScript('receive_due_messages', [
175 1
            $this->namespace,
176 1
            $start,
177 1
            $now->getTimestamp(),
178 1
            $limit
179
        ]);
180 1
        $commands = [ ];
181 1
        foreach ($results as $result) {
182 1
            list($queueName, $id, $message, $score) = $result;
183 1
            $commands[ ] = new ReceivedScheduledCommand(
184
                $queueName,
185
                $id,
186
                $message,
187 1
                new \DateTime('@'.$score)
188
            );
189
        }
190 1
        return $commands;
191
    }
192
193 1
    public function purgeNamespace()
194
    {
195 1
        $this->evalScript('purge_namespace', [$this->namespace]);
196 1
    }
197
198 8
    private function evalScript(string $script, array $args)
199
    {
200 8
        $path = self::LUA_PATH . '/' . $script . '.lua';
201 8
        $lua = file_get_contents($path);
202 8
        return $this->adapter->evalLua($lua, $args);
203
    }
204
}
205