Completed
Push — master ( 4c16ee...e71fad )
by Mike
04:00
created

PredisAdapter   A

Complexity

Total Complexity 32

Size/Duplication

Total Lines 172
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 6

Test Coverage

Coverage 0%

Importance

Changes 0
Metric Value
wmc 32
c 0
b 0
f 0
lcom 1
cbo 6
dl 0
loc 172
ccs 0
cts 140
cp 0
rs 9.6

21 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 4 1
A queueCommand() 0 4 1
B awaitCommand() 0 22 5
A getCommandStatus() 0 4 1
A setCommandCompleted() 0 4 1
A setCommandFailed() 0 4 1
A putQueue() 0 4 1
A getQueueNames() 0 4 1
A getQueuedCount() 0 4 1
A getQueuedIds() 0 4 1
A getConsumingCount() 0 4 1
A getConsumingIds() 0 4 1
A readCommand() 0 8 2
A clearQueue() 0 4 1
A deleteQueue() 0 5 1
A purgeCommand() 0 4 1
A scheduleCommand() 0 4 1
A cancelScheduledCommand() 0 4 1
B clearSchedule() 0 13 5
B receiveDueCommands() 0 30 3
A executeLuaScript() 0 6 1
1
<?php
2
3
namespace MGDigital\BusQue\Predis;
4
5
use MGDigital\BusQue\Exception\CommandNotFoundException;
6
use MGDigital\BusQue\Exception\TimeoutException;
7
use MGDigital\BusQue\QueueAdapterInterface;
8
use MGDigital\BusQue\ReceivedCommand;
9
use MGDigital\BusQue\ReceivedScheduledCommand;
10
use MGDigital\BusQue\SchedulerAdapterInterface;
11
use MGDigital\BusQue\SchedulerWorker;
12
use Predis\Client;
13
use Predis\ClientContextInterface;
14
use Predis\Connection\ConnectionException;
15
16
class PredisAdapter implements QueueAdapterInterface, SchedulerAdapterInterface
17
{
18
19
    const LUA_PATH = __DIR__ . '/../../lua';
20
21
    private $client;
22
23
    public function __construct(Client $client)
24
    {
25
        $this->client = $client;
26
    }
27
28
    public function queueCommand(string $queueName, string $id, string $serialized)
29
    {
30
        $this->executeLuaScript('queue_message', [$queueName, $id, $serialized]);
31
    }
32
33
    public function awaitCommand(string $queueName, int $timeout = null): ReceivedCommand
34
    {
35
        $stopwatchStart = time();
36
        $this->client->ping();
37
        try {
38
            $id = $this->client->brpoplpush(":{$queueName}:queue", ":{$queueName}:consuming", $timeout ?? 0);
39
        } catch (ConnectionException $e) {
40
            $id = null;
41
        }
42
        if (!$id) {
43
            if ($timeout !== null) {
44
                $timeout = time() - $stopwatchStart - $timeout;
45
                if ($timeout <= 0) {
46
                    throw new TimeoutException;
47
                }
48
            }
49
            return $this->awaitCommand($queueName, $timeout);
50
        }
51
        /* @var $id string */
52
        $serialized = $this->executeLuaScript('receive_message', [$queueName, $id]);
53
        return new ReceivedCommand($queueName, $id, $serialized);
54
    }
55
56
    public function getCommandStatus(string $queueName, string $id): string
57
    {
58
        return $this->client->hget(":{$queueName}:statuses", $id) ?? self::STATUS_NOT_FOUND;
59
    }
60
61
    public function setCommandCompleted(string $queueName, string $id)
62
    {
63
        $this->executeLuaScript('acknowledge_message', [$queueName, $id]);
64
    }
65
66
    public function setCommandFailed(string $queueName, string $id)
67
    {
68
        $this->executeLuaScript('reject_message', [$queueName, $id]);
69
    }
70
71
    public function putQueue(string $queueName)
72
    {
73
        $this->client->sadd(':queues', [ $queueName ]);
74
    }
75
76
    public function getQueueNames(): array
77
    {
78
        return $this->client->smembers(':queues');
79
    }
80
81
    public function getQueuedCount(string $queueName): int
82
    {
83
        return $this->client->llen(":{$queueName}:queue");
84
    }
85
86
    public function getQueuedIds(string $queueName, int $offset = 0, int $limit = 10): array
87
    {
88
        return $this->client->lrange(":{$queueName}:queue", $offset, $limit);
89
    }
90
91
    public function getConsumingCount(string $queueName): int
92
    {
93
        return $this->client->llen(":{$queueName}:consuming");
94
    }
95
96
    public function getConsumingIds(string $queueName, int $offset = 0, int $limit = 10): array
97
    {
98
        return $this->client->lrange(":{$queueName}:consuming", $offset, $limit);
99
    }
100
101
    public function readCommand(string $queueName, string $id): string
102
    {
103
        $serialized = $this->client->hget(":{$queueName}:messages", $id);
104
        if ($serialized === null) {
105
            throw new CommandNotFoundException();
106
        }
107
        return $serialized;
108
    }
109
110
    public function clearQueue(string $queueName)
111
    {
112
        $this->executeLuaScript('empty_queue', [$queueName]);
113
    }
114
115
    public function deleteQueue(string $queueName)
116
    {
117
        $this->clearQueue($queueName);
118
        $this->clearSchedule([$queueName]);
119
    }
120
121
    public function purgeCommand(string $queueName, string $id)
122
    {
123
        $this->executeLuaScript('purge_message', [$queueName, $id]);
124
    }
125
126
    public function scheduleCommand(string $queueName, string $id, string $serialized, \DateTime $dateTime)
127
    {
128
        $this->executeLuaScript('schedule_message', [$queueName, $id, $serialized, $dateTime->getTimestamp()]);
129
    }
130
131
    public function cancelScheduledCommand(string $queueName, string $id)
132
    {
133
        $this->purgeCommand($queueName, $id);
134
    }
135
136
    public function clearSchedule(array $queueNames = null, \DateTime $start = null, \DateTime $end = null)
137
    {
138
        if ($queueNames === null) {
139
            $queueNames = [ null ];
140
        }
141
        foreach ($queueNames as $queueName) {
142
            $this->executeLuaScript('clear_schedule', [
143
                $queueName,
144
                $start ? $start->getTimestamp() : '-inf',
145
                $end ? $end->getTimestamp() : '+inf'
146
            ]);
147
        }
148
    }
149
150
    public function receiveDueCommands(
151
        \DateTime $now,
152
        int $limit = SchedulerWorker::DEFAULT_THROTTLE,
153
        \DateTime $startTime = null
154
    ): array {
155
        if ($startTime === null) {
156
            $start = 0;
157
        } else {
158
            $start = $startTime->getTimestamp();
159
        }
160
        
161
        $results = $this->executeLuaScript('receive_due_messages', [
162
            $start,
163
            $now->getTimestamp(),
164
            $limit
165
        ]);
166
167
        $commands = [ ];
168
169
        foreach ($results as $result) {
170
            list($queueName, $id, $message, $score) = $result;
171
            $commands[ ] = new ReceivedScheduledCommand(
172
                $queueName,
173
                $id,
174
                $message,
175
                new \DateTime('@'.$score)
176
            );
177
        }
178
        return $commands;
179
    }
180
181
    private function executeLuaScript(string $script, array $args)
182
    {
183
        $command = new LuaFileCommand(static::LUA_PATH . '/' . $script . '.lua');
184
        $command->setArguments($args);
185
        return $this->client->executeCommand($command);
186
    }
187
}
188