Completed
Push — master ( 3717b7...9044e9 )
by Mike
02:46
created

RedisDriver::awaitCommand()   A

Complexity

Conditions 3
Paths 3

Size

Total Lines 17
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 10
CRAP Score 3.0416

Importance

Changes 0
Metric Value
c 0
b 0
f 0
dl 0
loc 17
ccs 10
cts 12
cp 0.8333
rs 9.4285
cc 3
eloc 12
nc 3
nop 2
crap 3.0416
1
<?php
2
3
namespace MGDigital\BusQue\Redis;
4
5
use MGDigital\BusQue\Exception\CommandNotFoundException;
6
use MGDigital\BusQue\Exception\DriverException;
7
use MGDigital\BusQue\Exception\TimeoutException;
8
use MGDigital\BusQue\QueueDriverInterface;
9
use MGDigital\BusQue\ReceivedCommand;
10
use MGDigital\BusQue\ReceivedScheduledCommand;
11
use MGDigital\BusQue\SchedulerDriverInterface;
12
use MGDigital\BusQue\SchedulerWorker;
13
14
final class RedisDriver implements QueueDriverInterface, SchedulerDriverInterface
15
{
16
17
    const LUA_PATH = __DIR__ . '/../../lua';
18
19
    private $adapter;
20
    private $namespace;
21
22 19
    public function __construct(RedisAdapterInterface $adapter, string $namespace = '')
23
    {
24 19
        $this->adapter = $adapter;
25 19
        if (!preg_match('/^[a-z0-9_\-]*$/i', $namespace)) {
26
            throw new \InvalidArgumentException('Invalid namespace.');
27
        }
28 19
        $this->namespace = $namespace;
29 19
    }
30
31 1
    public function queueCommand(string $queueName, string $commandId, string $serialized)
32
    {
33 1
        $this->evalScript('queue_message', [$this->namespace, $queueName, $commandId, $serialized]);
34 1
    }
35
36 1
    public function awaitCommand(string $queueName, int $time = null): ReceivedCommand
37
    {
38 1
        $this->adapter->ping();
39 1
        $id = $this->adapter->bRPopLPush(
40 1
            "{$this->namespace}:{$queueName}:queue",
41 1
            "{$this->namespace}:{$queueName}:receiving",
42 1
            $time ?? 0
43
        );
44 1
        if (!empty($id)) {
45 1
            $serialized = $this->evalScript('receive_message', [$this->namespace, $queueName, $id]);
46 1
            if (empty($serialized)) {
47
                throw new DriverException(sprintf('Error deserializing command %s.', $id));
48
            }
49 1
            return new ReceivedCommand($queueName, $id, $serialized);
50
        }
51
        throw new TimeoutException;
52
    }
53
54 1
    public function completeCommand(string $queueName, string $id)
55
    {
56 1
        $this->adapter->sRem("{$this->namespace}:{$queueName}:consuming", [ $id ]);
57 1
    }
58
59 1
    public function putQueue(string $queueName)
60
    {
61 1
        $this->adapter->sAdd("{$this->namespace}:queues", [ $queueName ]);
62 1
    }
63
64 1
    public function getQueueNames(): array
65
    {
66 1
        return $this->adapter->sMembers("{$this->namespace}:queues");
67
    }
68
69 1
    public function getQueuedCount(string $queueName): int
70
    {
71 1
        return $this->adapter->lLen("{$this->namespace}:{$queueName}:queue");
72
    }
73
74 1
    public function isIdQueued(string $queueName, string $id): bool
75
    {
76 1
        return $this->adapter->sIsMember("{$this->namespace}:{$queueName}:queued_ids", $id);
77
    }
78
79 1
    public function getQueuedIds(string $queueName, int $offset = 0, int $limit = 10): array
80
    {
81 1
        return $this->adapter->lRange("{$this->namespace}:{$queueName}:queue", $offset, $limit);
82
    }
83
84 1
    public function isIdConsuming(string $queueName, string $id): bool
85
    {
86 1
        return $this->adapter->sIsMember("{$this->namespace}:{$queueName}:consuming", $id);
87
    }
88
89 1
    public function getConsumingIds(string $queueName): array
90
    {
91 1
        return $this->adapter->sMembers("{$this->namespace}:{$queueName}:consuming");
92
    }
93
94 1
    public function readCommand(string $queueName, string $id): string
95
    {
96 1
        $serialized = $this->adapter->hGet("{$this->namespace}:{$queueName}:messages", $id);
97 1
        if ($serialized === null) {
98
            throw new CommandNotFoundException();
99
        }
100 1
        return $serialized;
101
    }
102
103 1
    public function deleteQueue(string $queueName)
104
    {
105 1
        $this->evalScript('empty_queue', [$this->namespace, $queueName]);
106 1
    }
107
108 1
    public function purgeCommand(string $queueName, string $id)
109
    {
110 1
        $this->evalScript('purge_message', [$this->namespace, $queueName, $id]);
111 1
    }
112
113 1
    public function scheduleCommand(string $queueName, string $id, string $serialized, \DateTime $dateTime)
114
    {
115 1
        $this->evalScript(
116 1
            'schedule_message',
117 1
            [$this->namespace, $queueName, $id, $serialized, $dateTime->getTimestamp()]
118
        );
119 1
    }
120
121
    /**
122
     * @param string $queueName
123
     * @param string $id
124
     * @return \DateTime|null
125
     */
126 1
    public function getScheduledTime(string $queueName, string $id)
127
    {
128 1
        $score = $this->adapter->zScore("{$this->namespace}:schedule", "$queueName||$id");
129 1
        if ($score !== null) {
130 1
            return new \DateTime("@$score");
131
        }
132
    }
133
134
    public function cancelScheduledCommand(string $queueName, string $id)
135
    {
136
        $this->purgeCommand($queueName, $id);
137
    }
138
139 1
    public function clearSchedule(array $queueNames = null, \DateTime $start = null, \DateTime $end = null)
140
    {
141 1
        if ($queueNames === null) {
142
            $queueNames = [ null ];
143
        }
144 1
        foreach ($queueNames as $queueName) {
145 1
            $this->evalScript('clear_schedule', [
146 1
                $this->namespace,
147 1
                $queueName,
148 1
                $start ? $start->getTimestamp() : '-inf',
149 1
                $end ? $end->getTimestamp() : '+inf'
150
            ]);
151
        }
152 1
    }
153
154 1
    public function receiveDueCommands(
155
        \DateTime $now,
156
        int $limit = SchedulerWorker::DEFAULT_THROTTLE,
157
        \DateTime $startTime = null
158
    ): array {
159 1
        if ($startTime === null) {
160 1
            $start = 0;
161
        } else {
162
            $start = $startTime->getTimestamp();
163
        }
164 1
        $results = $this->evalScript('receive_due_messages', [
165 1
            $this->namespace,
166 1
            $start,
167 1
            $now->getTimestamp(),
168 1
            $limit
169
        ]);
170 1
        $commands = [ ];
171 1
        foreach ($results as $result) {
172 1
            list($queueName, $id, $message, $score) = $result;
173 1
            $commands[ ] = new ReceivedScheduledCommand(
174
                $queueName,
175
                $id,
176
                $message,
177 1
                new \DateTime('@'.$score)
178
            );
179
        }
180 1
        return $commands;
181
    }
182
183 1
    public function purgeNamespace()
184
    {
185 1
        $this->evalScript('purge_namespace', [$this->namespace]);
186 1
    }
187
188 8
    private function evalScript(string $script, array $args)
189
    {
190 8
        $path = self::LUA_PATH . '/' . $script . '.lua';
191 8
        $lua = file_get_contents($path);
192 8
        return $this->adapter->evalLua($lua, $args);
193
    }
194
}
195