RedisDriver   A
last analyzed

Complexity

Total Complexity 32

Size/Duplication

Total Lines 183
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 6

Test Coverage

Coverage 89.36%

Importance

Changes 0
Metric Value
wmc 32
lcom 1
cbo 6
dl 0
loc 183
ccs 84
cts 94
cp 0.8936
rs 9.6
c 0
b 0
f 0

21 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 8 2
A queueCommand() 0 4 1
A awaitCommand() 0 16 3
A completeCommand() 0 4 1
A putQueue() 0 4 1
A getQueueNames() 0 4 1
A getQueuedCount() 0 4 1
A isIdQueued() 0 4 1
A getQueuedIds() 0 4 1
A isIdConsuming() 0 4 1
A getConsumingIds() 0 4 1
A readCommand() 0 8 2
A deleteQueue() 0 4 1
A purgeCommand() 0 4 1
A scheduleCommand() 0 7 1
A getScheduledTime() 0 7 2
A cancelScheduledCommand() 0 4 1
B clearSchedule() 0 17 5
B receiveDueCommands() 0 28 3
A purgeNamespace() 0 4 1
A evalScript() 0 6 1
1
<?php
2
3
namespace MGDigital\BusQue\Redis;
4
5
use MGDigital\BusQue\Exception\CommandNotFoundException;
6
use MGDigital\BusQue\Exception\ConcurrencyException;
7
use MGDigital\BusQue\Exception\TimeoutException;
8
use MGDigital\BusQue\QueueDriverInterface;
9
use MGDigital\BusQue\ReceivedCommand;
10
use MGDigital\BusQue\ReceivedScheduledCommand;
11
use MGDigital\BusQue\SchedulerDriverInterface;
12
use MGDigital\BusQue\SchedulerWorker;
13
14
final class RedisDriver implements QueueDriverInterface, SchedulerDriverInterface
15
{
16
17
    const LUA_PATH = __DIR__ . '/../../lua';
18
19
    private $adapter;
20
    private $namespace;
21
22 19
    public function __construct(RedisAdapterInterface $adapter, string $namespace = '')
23
    {
24 19
        $this->adapter = $adapter;
25 19
        if (!preg_match('/^[a-z0-9_\-\.]*$/i', $namespace)) {
26
            throw new \InvalidArgumentException('Invalid namespace.');
27
        }
28 19
        $this->namespace = $namespace;
29 19
    }
30
31 1
    public function queueCommand(string $queueName, string $commandId, string $serialized)
32
    {
33 1
        $this->evalScript('queue_message', [$this->namespace, $queueName, $commandId, $serialized]);
34 1
    }
35
36 1
    public function awaitCommand(string $queueName, int $time = null): ReceivedCommand
37
    {
38 1
        $id = $this->adapter->bRPopLPush(
39 1
            "{$this->namespace}:{$queueName}:queue",
40 1
            "{$this->namespace}:{$queueName}:receiving",
41 1
            $time ?? 0
42
        );
43 1
        if (empty($id)) {
44
            throw new TimeoutException;
45
        }
46 1
        $serialized = $this->evalScript('receive_message', [$this->namespace, $queueName, $id]);
47 1
        if (empty($serialized)) {
48
            throw new ConcurrencyException(sprintf('Cannot currently receive command %s.', $id));
49
        }
50 1
        return new ReceivedCommand($queueName, $id, $serialized);
51
    }
52
53 1
    public function completeCommand(string $queueName, string $id)
54
    {
55 1
        $this->adapter->sRem("{$this->namespace}:{$queueName}:consuming", [ $id ]);
56 1
    }
57
58 1
    public function putQueue(string $queueName)
59
    {
60 1
        $this->adapter->sAdd("{$this->namespace}:queues", [ $queueName ]);
61 1
    }
62
63 1
    public function getQueueNames(): array
64
    {
65 1
        return $this->adapter->sMembers("{$this->namespace}:queues");
66
    }
67
68 1
    public function getQueuedCount(string $queueName): int
69
    {
70 1
        return $this->adapter->lLen("{$this->namespace}:{$queueName}:queue");
71
    }
72
73 1
    public function isIdQueued(string $queueName, string $id): bool
74
    {
75 1
        return $this->adapter->sIsMember("{$this->namespace}:{$queueName}:queued_ids", $id);
76
    }
77
78 1
    public function getQueuedIds(string $queueName, int $offset = 0, int $limit = 10): array
79
    {
80 1
        return $this->adapter->lRange("{$this->namespace}:{$queueName}:queue", $offset, $limit);
81
    }
82
83 1
    public function isIdConsuming(string $queueName, string $id): bool
84
    {
85 1
        return $this->adapter->sIsMember("{$this->namespace}:{$queueName}:consuming", $id);
86
    }
87
88 1
    public function getConsumingIds(string $queueName): array
89
    {
90 1
        return $this->adapter->sMembers("{$this->namespace}:{$queueName}:consuming");
91
    }
92
93 1
    public function readCommand(string $queueName, string $id): string
94
    {
95 1
        $serialized = $this->adapter->hGet("{$this->namespace}:{$queueName}:messages", $id);
96 1
        if ($serialized === null) {
97
            throw new CommandNotFoundException();
98
        }
99 1
        return $serialized;
100
    }
101
102 1
    public function deleteQueue(string $queueName)
103
    {
104 1
        $this->evalScript('empty_queue', [$this->namespace, $queueName]);
105 1
    }
106
107 1
    public function purgeCommand(string $queueName, string $id)
108
    {
109 1
        $this->evalScript('purge_message', [$this->namespace, $queueName, $id]);
110 1
    }
111
112 1
    public function scheduleCommand(string $queueName, string $id, string $serialized, \DateTimeInterface $dateTime)
113
    {
114 1
        $this->evalScript(
115 1
            'schedule_message',
116 1
            [$this->namespace, $queueName, $id, $serialized, $dateTime->getTimestamp()]
117
        );
118 1
    }
119
120
    /**
121
     * @param string $queueName
122
     * @param string $id
123
     * @return \DateTimeInterface|null
124
     */
125 1
    public function getScheduledTime(string $queueName, string $id)
126
    {
127 1
        $score = $this->adapter->zScore("{$this->namespace}:schedule", "$queueName||$id");
128 1
        if ($score !== null) {
129 1
            return new \DateTimeImmutable("@$score");
130
        }
131
    }
132
133
    public function cancelScheduledCommand(string $queueName, string $id)
134
    {
135
        $this->purgeCommand($queueName, $id);
136
    }
137
138 1
    public function clearSchedule(
139
        array $queueNames = null,
140
        \DateTimeInterface $start = null,
141
        \DateTimeInterface $end = null
142
    ) {
143 1
        if ($queueNames === null) {
144
            $queueNames = [ null ];
145
        }
146 1
        foreach ($queueNames as $queueName) {
147 1
            $this->evalScript('clear_schedule', [
148 1
                $this->namespace,
149 1
                $queueName,
150 1
                $start ? $start->getTimestamp() : '-inf',
151 1
                $end ? $end->getTimestamp() : '+inf'
152
            ]);
153
        }
154 1
    }
155
156 1
    public function receiveDueCommands(
157
        \DateTimeInterface $now,
158
        int $limit = SchedulerWorker::DEFAULT_THROTTLE,
159
        \DateTimeInterface $startTime = null
160
    ): array {
161 1
        if ($startTime === null) {
162 1
            $start = 0;
163
        } else {
164
            $start = $startTime->getTimestamp();
165
        }
166 1
        $results = $this->evalScript('receive_due_messages', [
167 1
            $this->namespace,
168 1
            $start,
169 1
            $now->getTimestamp(),
170 1
            $limit
171
        ]);
172 1
        $commands = [ ];
173 1
        foreach ($results as $result) {
174 1
            list($queueName, $id, $message, $score) = $result;
175 1
            $commands[ ] = new ReceivedScheduledCommand(
176
                $queueName,
177
                $id,
178
                $message,
179 1
                new \DateTimeImmutable('@'.$score)
180
            );
181
        }
182 1
        return $commands;
183
    }
184
185 1
    public function purgeNamespace()
186
    {
187 1
        $this->evalScript('purge_namespace', [$this->namespace]);
188 1
    }
189
190 8
    private function evalScript(string $script, array $args)
191
    {
192 8
        $path = self::LUA_PATH . '/' . $script . '.lua';
193 8
        $lua = file_get_contents($path);
194 8
        return $this->adapter->evalLua($lua, $args);
195
    }
196
}
197