Passed
Push — master ( 35afea...a9381d )
by Dirk
02:07
created

DataStore   A

Complexity

Total Complexity 35

Size/Duplication

Total Lines 211
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 35
eloc 80
dl 0
loc 211
c 0
b 0
f 0
rs 9.6

30 Methods

Rating   Name   Duplication   Size   Complexity  
A registerWorker() 0 7 1
A extractHeartbeatDateTime() 0 8 3
A redisKeyForWorkerStartTime() 0 3 1
A redisKeyForQueue() 0 3 1
A incrementStat() 0 3 1
A allResqueKeys() 0 6 1
A getWorkerStartTime() 0 9 3
A pushToFailedQueue() 0 3 1
A setWorkerPayload() 0 3 1
A unregisterWorker() 0 9 1
A redisKeyForWorker() 0 3 1
A getWorkerHeartbeat() 0 4 1
A workerStarted() 0 4 1
A setDispatcher() 0 3 1
A workerExists() 0 3 1
A getWorkerHeartbeats() 0 4 1
A reconnect() 0 4 1
A popFromQueue() 0 6 1
A decrementStat() 0 3 1
A crearStat() 0 3 1
A workerDoneWorking() 0 7 1
A getStat() 0 3 1
A hasWorkerHeartbeat() 0 4 2
A pushToQueue() 0 13 1
A queueSize() 0 3 1
A getWorkerPayload() 0 3 1
A acquirePruningDeadWorkerLock() 0 3 1
A redisKeyForStats() 0 3 1
A __construct() 0 5 1
A removeWorkerHeartbeat() 0 3 1
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Resque;
6
7
use Predis\Client;
8
use Resque\Dispatchers\Noop;
9
use Resque\Interfaces\DispatcherInterface;
10
11
class DataStore
12
{
13
    public const REDIS_DATE_FORMAT = 'Y-m-d H:i:s O';
14
    public const REDIS_KEY_FOR_WORKER_PRUNING = "pruning_dead_workers_in_progress";
15
    public const REDIS_HEARTBEAT_KEY = "workers:heartbeat";
16
17
    private $redis;
18
    private $namespace;
19
    private $dispatcher;
20
21
    public function __construct(Client $client, string $resqueNamespace)
22
    {
23
        $this->redis = $client;
24
        $this->namespace = $resqueNamespace;
25
        $this->dispatcher = new Noop();
26
    }
27
28
    public function setDispatcher(DispatcherInterface $dispatcher): void
29
    {
30
        $this->dispatcher = $dispatcher;
31
    }
32
33
    public function allResqueKeys(): array
34
    {
35
        return array_map(
36
            $this->redis->keys('*'),
37
            function ($key) {
38
                return str_replace("{$this->namespace}:", '');
39
            }
40
        );
41
    }
42
43
    public function pushToQueue(string $queueName, string $json): void
44
    {
45
        $queueKey = $this->redisKeyForQueue($queueName);
46
        $payload = [
47
            'queue_key' => $queueKey,
48
            'queue_name' => $queueName,
49
            'json' => $json,
50
            'command' => 'rpush',
51
        ];
52
        $payload = $this->dispatcher->dispatch(BeforeJobPush::class, $payload);
53
        $command = $payload['command'];
54
        $this->redis->sadd('queues', $payload['queue_name']);
55
        $this->redis->$command($payload['queue_key'], $payload['json']);
56
    }
57
58
    public function popFromQueue(string $queueName): string
59
    {
60
        $payload = ['command' => 'lpop', 'queue_name' => $queueName];
61
        $payload = $this->dispatcher->dispatch(BeforeJobPop::class, $payload);
62
        $command = $payload['command'];
63
        return $this->redis->$command($this->redisKeyForQueue($payload['queue_name']));
64
    }
65
66
    public function queueSize(string $queueName): integer
0 ignored issues
show
introduced by
The return type was hinted with object(integer). Did you maybe mean int?
Loading history...
67
    {
68
        return intval($this->redis->llen($this->redisKeyForQueue($queueName)));
69
    }
70
71
    public function redisKeyForQueue(string $queueName): string
72
    {
73
        return "queue:{$queueName}";
74
    }
75
76
    public function pushToFailedQueue(string $json): void
77
    {
78
        $this->pushToQueue($json, 'failed');
79
    }
80
81
    public function getWorkerPayload(string $workerId): string
82
    {
83
        return $this->redis->get($this->redisKeyForWorker($workerId));
84
    }
85
86
    public function workerExists(string $workerId): bool
87
    {
88
        return $this->redis->sismember("workers", $workerId);
89
    }
90
91
    public function registerWorker(string $workerId): void
92
    {
93
        $redis = $this->redis;
94
        $that = $this;
95
        $this->redis->pipeline(function () use ($redis, $workerId, $that) {
96
            $redis->sadd("workers", $workerId);
97
            $that->workerStarted($workerId);
98
        });
99
    }
100
101
    public function unregisterWorker(string $workerId): void
102
    {
103
        $redis = $this->redis;
104
        $that = $this;
105
        $redis->pipeline(function () use ($redis, $workerId, $that) {
106
            $redis->srem("workers", $workerId);
107
            $redis->del($that->redisKeyForWorker($workerId));
108
            $redis->del($that->redisKeyForWorkerStartTime($workerId));
109
            $that->removeWorkerHeartbeat($workerId);
110
        });
111
    }
112
113
    public function removeWorkerHeartbeat(string $workerId): void
114
    {
115
        $redis->hdel(DataStore::HEARTBEAT_KEY, $workerId);
116
    }
117
118
    public function hasWorkerHeartbeat(string $workerId): bool
119
    {
120
        $heartbeat = $this->redis->hget(DataStore::HEARTBEAT_KEY, $workerId);
121
        return !empty($heartbeat) && DateTime::createFromFormat(DataStore::REDIS_DATE_FORMAT, $heartbeat);
122
    }
123
124
    public function getWorkerHeartbeat(string $workerId): DateTime
125
    {
126
        $heartbeat = $this->redis->hget(DataStore::HEARTBEAT_KEY, $workerId);
127
        return $this->extractHeartbeatDateTime($heartbeat);
128
    }
129
130
    public function getWorkerHeartbeats(): array
131
    {
132
        $heartbeats = $this->redis->hgetall(DataStore::HEARTBEAT_KEY);
133
        return array_map($heartbeats, [$this, 'extractHeartbeatDateTime']);
134
    }
135
136
    public function extractHeartbeatDateTime($heartbeat): DateTime
137
    {
138
        if (!empty($heartbeat)) {
139
            if (false !== ($time = DateTime::createFromFormat(DataStore::REDIS_DATE_FORMAT, $heartbeat))) {
140
                return $time;
141
            }
142
        }
143
        throw new RuntimeException("Invalid Worker Heartbeat");
144
    }
145
146
    public function acquirePruningDeadWorkerLock(string $workerId, int $expiry): void
147
    {
148
        $this->redis->set(self::REDIS_KEY_FOR_WORKER_PRUNING, 'EX', $expiry, 'NX');
149
    }
150
151
    public function workerStarted(string $workerId): void
152
    {
153
        $startTime = (new DateTime())->format(self::REDIS_DATE_FORMAT);
154
        $this->redis->set($this->redisKeyForWorkerStartTime($workerId), $startTime);
155
    }
156
157
    public function redisKeyForWorker(string $workerId): string
158
    {
159
        return "worker:{$workerId}";
160
    }
161
162
    public function redisKeyForWorkerStartTime(string $workerId): string
163
    {
164
        return "{$this->redisKeyForWorker($workerId)}:started";
165
    }
166
167
    public function setWorkerPayload(string $workerId, string $data): void
168
    {
169
        $this->redis->set($this->redisKeyForWorker($workerId), $data);
170
    }
171
172
    public function getWorkerStartTime(string $workerId): DateTime
173
    {
174
        $workerStartRedisTime = $this->redis->get($this->redisKeyForWorkerStartTime($workerId));
175
        if (!empty($workerStartRedisTime)) {
176
            if (false !== ($workerStartTime = DateTime::createFromFormat(DataStore::REDIS_DATE_FORMAT, $workerStartRedisTime))) {
177
                return $workerStartTime;
178
            }
179
        }
180
        throw new RuntimeException("Invalid Worker Start Time");
181
    }
182
183
    public function workerDoneWorking(string $workerId, callable $block): void
184
    {
185
        $redis = $this->redis;
186
        $workerKey = $this->redisKeyForWorker($workerId);
187
        $this->redis->pipeline(function () use ($redis, $workerKey, $block) {
188
            $redis->del($workerKey);
189
            $block();
190
        });
191
    }
192
193
    public function getStat(string $statName): int
194
    {
195
        return $this->redis($this->redisKeyForStats($statName));
196
    }
197
198
    public function incrementStat(string $statName, int $by): void
199
    {
200
        $this->redis->incrby($this->redisKeyForStats($statName), $by);
201
    }
202
203
    public function decrementStat(string $statName, int $by): void
204
    {
205
        $this->redis->decrby($this->redisKeyForStats($statName), $by);
206
    }
207
208
    public function crearStat(string $statName): void
209
    {
210
        $this->redis->del($this->redisKeyForStats($statName));
211
    }
212
213
    public function redisKeyForStats(string $statName): string
214
    {
215
        return "stats:{$statName}";
216
    }
217
218
    public function reconnect(): void
219
    {
220
        $this->redis->disconnect();
221
        $this->redis->connect();
222
    }
223
}
224