Passed
Push — master ( bc2961...480562 )
by Dirk
02:35
created

DataStore::removeWorkerHeartbeat()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 0
Metric Value
eloc 1
dl 0
loc 3
ccs 2
cts 2
cp 1
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 1
crap 1
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Resque;
6
7
use Predis\Client;
8
use Resque\Dispatchers\Noop;
9
use Resque\Interfaces\DispatcherInterface;
10
use Resque\Tasks\BeforeJobPop;
11
use Resque\Tasks\BeforeJobPush;
12
13
class DataStore
14
{
15
    public const REDIS_DATE_FORMAT = 'Y-m-d H:i:s O';
16
    public const REDIS_KEY_FOR_WORKER_PRUNING = "pruning_dead_workers_in_progress";
17
    public const REDIS_HEARTBEAT_KEY = "workers:heartbeat";
18
19
    private $redis;
20
    private $dispatcher;
21
22 6
    public function __construct(Client $client)
23
    {
24 6
        $this->redis = $client;
25 6
        $this->dispatcher = new Noop();
26 6
    }
27
28 6
    public function setDispatcher(DispatcherInterface $dispatcher): void
29
    {
30 6
        $this->dispatcher = $dispatcher;
31 6
    }
32
33
    /*
34
    public function allResqueKeys(): array
35
    {
36
        return array_map(
37
            $this->redis->keys('*'),
38
            function ($key) {
39
                return str_replace("{$this->namespace}:", '');
40
            }
41
        );
42
    }
43
    */
44
45 2
    public function pushToQueue(string $queueName, string $json): void
46
    {
47 2
        $queueKey = $this->redisKeyForQueue($queueName);
48
        $payload = [
49 2
            'queue_key' => $queueKey,
50 2
            'queue_name' => $queueName,
51 2
            'json' => $json,
52 2
            'command' => 'rpush',
53
        ];
54 2
        $payload = $this->dispatcher->dispatch(BeforeJobPush::class, $payload);
55 2
        $command = $payload['command'];
56 2
        $this->redis->sadd('queues', $payload['queue_name']);
57 2
        $this->redis->$command($payload['queue_key'], $payload['json']);
58 2
    }
59
60 1
    public function popFromQueue(string $queueName): string
61
    {
62 1
        $payload = ['command' => 'lpop', 'queue_name' => $queueName];
63 1
        $payload = $this->dispatcher->dispatch(BeforeJobPop::class, $payload);
64 1
        $command = $payload['command'];
65 1
        return $this->redis->$command($this->redisKeyForQueue($payload['queue_name']));
66
    }
67
68
    /*
69
    public function queueSize(string $queueName): int
70
    {
71
        return intval($this->redis->llen($this->redisKeyForQueue($queueName)));
72
    }
73
    */
74
75 3
    public function redisKeyForQueue(string $queueName): string
76
    {
77 3
        return "queue:{$queueName}";
78
    }
79
80 1
    public function pushToFailedQueue(string $json): void
81
    {
82 1
        $this->pushToQueue('failed', $json);
83 1
    }
84
85
    /*
86
    public function getWorkerPayload(string $workerId): string
87
    {
88
        return $this->redis->get($this->redisKeyForWorker($workerId));
89
    }
90
    */
91
92
    /*
93
    public function workerExists(string $workerId): bool
94
    {
95
        return $this->redis->sismember("workers", $workerId);
96
    }
97
    */
98
99 1
    public function registerWorker(string $workerId): void
100
    {
101 1
        $this->redis->sadd("workers", $workerId);
102 1
        $this->workerStarted($workerId);
103 1
    }
104
105 1
    public function unregisterWorker(string $workerId): void
106
    {
107 1
        $this->redis->srem("workers", $workerId);
108 1
        $this->redis->del($this->redisKeyForWorker($workerId));
109 1
        $this->redis->del($this->redisKeyForWorkerStartTime($workerId));
110 1
        $this->removeWorkerHeartbeat($workerId);
111 1
    }
112
113 1
    public function removeWorkerHeartbeat(string $workerId): void
114
    {
115 1
        $this->redis->hdel(DataStore::REDIS_HEARTBEAT_KEY, $workerId);
116 1
    }
117
118
    /*
119
    public function hasWorkerHeartbeat(string $workerId): bool
120
    {
121
        $heartbeat = $this->redis->hget(DataStore::REDIS_HEARTBEAT_KEY, $workerId);
122
        return !empty($heartbeat) && DateTime::createFromFormat(DataStore::REDIS_DATE_FORMAT, $heartbeat);
123
    }
124
    */
125
126
    /*
127
    public function getWorkerHeartbeat(string $workerId): DateTime
128
    {
129
        $heartbeat = $this->redis->hget(DataStore::REDIS_HEARTBEAT_KEY, $workerId);
130
        return $this->extractHeartbeatDateTime($heartbeat);
131
    }
132
    */
133
134
    /*
135
    public function getWorkerHeartbeats(): array
136
    {
137
        $heartbeats = $this->redis->hgetall(DataStore::REDIS_HEARTBEAT_KEY);
138
        return array_map($heartbeats, [$this, 'extractHeartbeatDateTime']);
139
    }
140
    */
141
142
    /*
143
    public function extractHeartbeatDateTime($heartbeat): DateTime
144
    {
145
        if (!empty($heartbeat)) {
146
            if (false !== ($time = DateTime::createFromFormat(DataStore::REDIS_DATE_FORMAT, $heartbeat))) {
147
                return $time;
148
            }
149
        }
150
        throw new RuntimeException("Invalid Worker Heartbeat");
151
    }
152
    */
153
154
    /*
155
    public function acquirePruningDeadWorkerLock(string $workerId, int $expiry): void
156
    {
157
        $this->redis->set(self::REDIS_KEY_FOR_WORKER_PRUNING, 'EX', $expiry, 'NX');
158
    }
159
    */
160
161 1
    public function workerStarted(string $workerId): void
162
    {
163 1
        $startTime = (new \DateTime())->format(self::REDIS_DATE_FORMAT);
164 1
        $this->redis->set($this->redisKeyForWorkerStartTime($workerId), $startTime);
165 1
    }
166
167 2
    public function redisKeyForWorker(string $workerId): string
168
    {
169 2
        return "worker:{$workerId}";
170
    }
171
172 2
    public function redisKeyForWorkerStartTime(string $workerId): string
173
    {
174 2
        return "{$this->redisKeyForWorker($workerId)}:started";
175
    }
176
177
    public function setWorkerPayload(string $workerId, string $data): void
178
    {
179
        $this->redis->set($this->redisKeyForWorker($workerId), $data);
180
    }
181
182
    public function getWorkerStartTime(string $workerId): DateTime
183
    {
184
        $workerStartRedisTime = $this->redis->get($this->redisKeyForWorkerStartTime($workerId));
185
        if (!empty($workerStartRedisTime)) {
186
            if (false !== ($workerStartTime = DateTime::createFromFormat(DataStore::REDIS_DATE_FORMAT, $workerStartRedisTime))) {
187
                return $workerStartTime;
188
            }
189
        }
190
        throw new RuntimeException("Invalid Worker Start Time");
191
    }
192
193
    public function workerDoneWorking(string $workerId, callable $block): void
194
    {
195
        $this->redis->del($this->redisKeyForWorker($workerId));
196
        $block();
197
    }
198
199
    /*
200
    public function getStat(string $statName): int
201
    {
202
        return $this->redis($this->redisKeyForStats($statName));
203
    }
204
    */
205
206
    /*
207
    public function incrementStat(string $statName, int $by): void
208
    {
209
        $this->redis->incrby($this->redisKeyForStats($statName), $by);
210
    }
211
    */
212
213
    /*
214
    public function decrementStat(string $statName, int $by): void
215
    {
216
        $this->redis->decrby($this->redisKeyForStats($statName), $by);
217
    }
218
    */
219
220
    /*
221
    public function crearStat(string $statName): void
222
    {
223
        $this->redis->del($this->redisKeyForStats($statName));
224
    }
225
    */
226
227
    /*
228
    public function redisKeyForStats(string $statName): string
229
    {
230
        return "stats:{$statName}";
231
    }
232
    */
233
234 1
    public function reconnect(): void
235
    {
236 1
        $this->redis->disconnect();
237 1
        $this->redis->connect();
238 1
    }
239
}
240