Completed
Push — master ( 9044e9...3b071d )
by Mike
02:38
created

RedisDriver   A

Complexity

Total Complexity 32

Size/Duplication

Total Lines 180
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 6

Test Coverage

Coverage 89.36%

Importance

Changes 0
Metric Value
wmc 32
lcom 1
cbo 6
dl 0
loc 180
ccs 84
cts 94
cp 0.8936
rs 9.6
c 0
b 0
f 0

21 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 8 2
A queueCommand() 0 4 1
A awaitCommand() 0 16 3
A completeCommand() 0 4 1
A putQueue() 0 4 1
A getQueueNames() 0 4 1
A getQueuedCount() 0 4 1
A isIdQueued() 0 4 1
A getQueuedIds() 0 4 1
A isIdConsuming() 0 4 1
A getConsumingIds() 0 4 1
A readCommand() 0 8 2
A deleteQueue() 0 4 1
A purgeCommand() 0 4 1
A scheduleCommand() 0 7 1
A getScheduledTime() 0 7 2
A cancelScheduledCommand() 0 4 1
B clearSchedule() 0 14 5
B receiveDueCommands() 0 28 3
A purgeNamespace() 0 4 1
A evalScript() 0 6 1
1
<?php
2
3
namespace MGDigital\BusQue\Redis;
4
5
use MGDigital\BusQue\Exception\CommandNotFoundException;
6
use MGDigital\BusQue\Exception\DriverException;
7
use MGDigital\BusQue\Exception\TimeoutException;
8
use MGDigital\BusQue\QueueDriverInterface;
9
use MGDigital\BusQue\ReceivedCommand;
10
use MGDigital\BusQue\ReceivedScheduledCommand;
11
use MGDigital\BusQue\SchedulerDriverInterface;
12
use MGDigital\BusQue\SchedulerWorker;
13
14
final class RedisDriver implements QueueDriverInterface, SchedulerDriverInterface
15
{
16
17
    const LUA_PATH = __DIR__ . '/../../lua';
18
19
    private $adapter;
20
    private $namespace;
21
22 19
    public function __construct(RedisAdapterInterface $adapter, string $namespace = '')
23
    {
24 19
        $this->adapter = $adapter;
25 19
        if (!preg_match('/^[a-z0-9_\-]*$/i', $namespace)) {
26
            throw new \InvalidArgumentException('Invalid namespace.');
27
        }
28 19
        $this->namespace = $namespace;
29 19
    }
30
31 1
    public function queueCommand(string $queueName, string $commandId, string $serialized)
32
    {
33 1
        $this->evalScript('queue_message', [$this->namespace, $queueName, $commandId, $serialized]);
34 1
    }
35
36 1
    public function awaitCommand(string $queueName, int $time = null): ReceivedCommand
37
    {
38 1
        $id = $this->adapter->bRPopLPush(
39 1
            "{$this->namespace}:{$queueName}:queue",
40 1
            "{$this->namespace}:{$queueName}:receiving",
41 1
            $time ?? 0
42
        );
43 1
        if (empty($id)) {
44
            throw new TimeoutException;
45
        }
46 1
        $serialized = $this->evalScript('receive_message', [$this->namespace, $queueName, $id]);
47 1
        if (empty($serialized)) {
48
            throw new DriverException(sprintf('Error retrieving command %s.', $id));
49
        }
50 1
        return new ReceivedCommand($queueName, $id, $serialized);
51
    }
52
53 1
    public function completeCommand(string $queueName, string $id)
54
    {
55 1
        $this->adapter->sRem("{$this->namespace}:{$queueName}:consuming", [ $id ]);
56 1
    }
57
58 1
    public function putQueue(string $queueName)
59
    {
60 1
        $this->adapter->sAdd("{$this->namespace}:queues", [ $queueName ]);
61 1
    }
62
63 1
    public function getQueueNames(): array
64
    {
65 1
        return $this->adapter->sMembers("{$this->namespace}:queues");
66
    }
67
68 1
    public function getQueuedCount(string $queueName): int
69
    {
70 1
        return $this->adapter->lLen("{$this->namespace}:{$queueName}:queue");
71
    }
72
73 1
    public function isIdQueued(string $queueName, string $id): bool
74
    {
75 1
        return $this->adapter->sIsMember("{$this->namespace}:{$queueName}:queued_ids", $id);
76
    }
77
78 1
    public function getQueuedIds(string $queueName, int $offset = 0, int $limit = 10): array
79
    {
80 1
        return $this->adapter->lRange("{$this->namespace}:{$queueName}:queue", $offset, $limit);
81
    }
82
83 1
    public function isIdConsuming(string $queueName, string $id): bool
84
    {
85 1
        return $this->adapter->sIsMember("{$this->namespace}:{$queueName}:consuming", $id);
86
    }
87
88 1
    public function getConsumingIds(string $queueName): array
89
    {
90 1
        return $this->adapter->sMembers("{$this->namespace}:{$queueName}:consuming");
91
    }
92
93 1
    public function readCommand(string $queueName, string $id): string
94
    {
95 1
        $serialized = $this->adapter->hGet("{$this->namespace}:{$queueName}:messages", $id);
96 1
        if ($serialized === null) {
97
            throw new CommandNotFoundException();
98
        }
99 1
        return $serialized;
100
    }
101
102 1
    public function deleteQueue(string $queueName)
103
    {
104 1
        $this->evalScript('empty_queue', [$this->namespace, $queueName]);
105 1
    }
106
107 1
    public function purgeCommand(string $queueName, string $id)
108
    {
109 1
        $this->evalScript('purge_message', [$this->namespace, $queueName, $id]);
110 1
    }
111
112 1
    public function scheduleCommand(string $queueName, string $id, string $serialized, \DateTime $dateTime)
113
    {
114 1
        $this->evalScript(
115 1
            'schedule_message',
116 1
            [$this->namespace, $queueName, $id, $serialized, $dateTime->getTimestamp()]
117
        );
118 1
    }
119
120
    /**
121
     * @param string $queueName
122
     * @param string $id
123
     * @return \DateTime|null
124
     */
125 1
    public function getScheduledTime(string $queueName, string $id)
126
    {
127 1
        $score = $this->adapter->zScore("{$this->namespace}:schedule", "$queueName||$id");
128 1
        if ($score !== null) {
129 1
            return new \DateTime("@$score");
130
        }
131
    }
132
133
    public function cancelScheduledCommand(string $queueName, string $id)
134
    {
135
        $this->purgeCommand($queueName, $id);
136
    }
137
138 1
    public function clearSchedule(array $queueNames = null, \DateTime $start = null, \DateTime $end = null)
139
    {
140 1
        if ($queueNames === null) {
141
            $queueNames = [ null ];
142
        }
143 1
        foreach ($queueNames as $queueName) {
144 1
            $this->evalScript('clear_schedule', [
145 1
                $this->namespace,
146 1
                $queueName,
147 1
                $start ? $start->getTimestamp() : '-inf',
148 1
                $end ? $end->getTimestamp() : '+inf'
149
            ]);
150
        }
151 1
    }
152
153 1
    public function receiveDueCommands(
154
        \DateTime $now,
155
        int $limit = SchedulerWorker::DEFAULT_THROTTLE,
156
        \DateTime $startTime = null
157
    ): array {
158 1
        if ($startTime === null) {
159 1
            $start = 0;
160
        } else {
161
            $start = $startTime->getTimestamp();
162
        }
163 1
        $results = $this->evalScript('receive_due_messages', [
164 1
            $this->namespace,
165 1
            $start,
166 1
            $now->getTimestamp(),
167 1
            $limit
168
        ]);
169 1
        $commands = [ ];
170 1
        foreach ($results as $result) {
171 1
            list($queueName, $id, $message, $score) = $result;
172 1
            $commands[ ] = new ReceivedScheduledCommand(
173
                $queueName,
174
                $id,
175
                $message,
176 1
                new \DateTime('@'.$score)
177
            );
178
        }
179 1
        return $commands;
180
    }
181
182 1
    public function purgeNamespace()
183
    {
184 1
        $this->evalScript('purge_namespace', [$this->namespace]);
185 1
    }
186
187 8
    private function evalScript(string $script, array $args)
188
    {
189 8
        $path = self::LUA_PATH . '/' . $script . '.lua';
190 8
        $lua = file_get_contents($path);
191 8
        return $this->adapter->evalLua($lua, $args);
192
    }
193
}
194