Passed
Push — master ( 712a25...0e9cbb )
by Dirk
03:10
created

DataStore::workerStarted()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
eloc 2
dl 0
loc 4
ccs 0
cts 3
cp 0
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 1
crap 2
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Resque;
6
7
use Predis\Client;
8
use Resque\Dispatchers\Noop;
9
use Resque\Interfaces\DispatcherInterface;
10
use Resque\Tasks\BeforeJobPop;
11
use Resque\Tasks\BeforeJobPush;
12
13
class DataStore
14
{
15
    public const REDIS_DATE_FORMAT = 'Y-m-d H:i:s O';
16
    public const REDIS_KEY_FOR_WORKER_PRUNING = "pruning_dead_workers_in_progress";
17
    public const REDIS_HEARTBEAT_KEY = "workers:heartbeat";
18
19
    private $redis;
20
    private $namespace;
21
    private $dispatcher;
22
23 1
    public function __construct(Client $client, string $resqueNamespace)
24
    {
25 1
        $this->redis = $client;
26 1
        $this->namespace = $resqueNamespace;
27 1
        $this->dispatcher = new Noop();
28 1
    }
29
30 1
    public function setDispatcher(DispatcherInterface $dispatcher): void
31
    {
32 1
        $this->dispatcher = $dispatcher;
33 1
    }
34
35
    public function allResqueKeys(): array
36
    {
37
        return array_map(
38
            $this->redis->keys('*'),
39
            function ($key) {
40
                return str_replace("{$this->namespace}:", '');
41
            }
42
        );
43
    }
44
45 1
    public function pushToQueue(string $queueName, string $json): void
46
    {
47 1
        $queueKey = $this->redisKeyForQueue($queueName);
48
        $payload = [
49 1
            'queue_key' => $queueKey,
50 1
            'queue_name' => $queueName,
51 1
            'json' => $json,
52 1
            'command' => 'rpush',
53
        ];
54 1
        $payload = $this->dispatcher->dispatch(BeforeJobPush::class, $payload);
55 1
        $command = $payload['command'];
56 1
        $this->redis->sadd('queues', $payload['queue_name']);
57 1
        $this->redis->$command($payload['queue_key'], $payload['json']);
58 1
    }
59
60
    public function popFromQueue(string $queueName): string
61
    {
62
        $payload = ['command' => 'lpop', 'queue_name' => $queueName];
63
        $payload = $this->dispatcher->dispatch(BeforeJobPop::class, $payload);
64
        $command = $payload['command'];
65
        return $this->redis->$command($this->redisKeyForQueue($payload['queue_name']));
66
    }
67
68
    public function queueSize(string $queueName): int
69
    {
70
        return intval($this->redis->llen($this->redisKeyForQueue($queueName)));
71
    }
72
73 1
    public function redisKeyForQueue(string $queueName): string
74
    {
75 1
        return "queue:{$queueName}";
76
    }
77
78
    public function pushToFailedQueue(string $json): void
79
    {
80
        $this->pushToQueue($json, 'failed');
81
    }
82
83
    public function getWorkerPayload(string $workerId): string
84
    {
85
        return $this->redis->get($this->redisKeyForWorker($workerId));
86
    }
87
88
    public function workerExists(string $workerId): bool
89
    {
90
        return $this->redis->sismember("workers", $workerId);
91
    }
92
93
    public function registerWorker(string $workerId): void
94
    {
95
        $redis = $this->redis;
96
        $that = $this;
97
        $this->redis->pipeline(function () use ($redis, $workerId, $that) {
98
            $redis->sadd("workers", $workerId);
99
            $that->workerStarted($workerId);
100
        });
101
    }
102
103
    public function unregisterWorker(string $workerId): void
104
    {
105
        $redis = $this->redis;
106
        $that = $this;
107
        $redis->pipeline(function () use ($redis, $workerId, $that) {
108
            $redis->srem("workers", $workerId);
109
            $redis->del($that->redisKeyForWorker($workerId));
110
            $redis->del($that->redisKeyForWorkerStartTime($workerId));
111
            $that->removeWorkerHeartbeat($workerId);
112
        });
113
    }
114
115
    public function removeWorkerHeartbeat(string $workerId): void
116
    {
117
        $redis->hdel(DataStore::HEARTBEAT_KEY, $workerId);
118
    }
119
120
    public function hasWorkerHeartbeat(string $workerId): bool
121
    {
122
        $heartbeat = $this->redis->hget(DataStore::HEARTBEAT_KEY, $workerId);
123
        return !empty($heartbeat) && DateTime::createFromFormat(DataStore::REDIS_DATE_FORMAT, $heartbeat);
124
    }
125
126
    public function getWorkerHeartbeat(string $workerId): DateTime
127
    {
128
        $heartbeat = $this->redis->hget(DataStore::HEARTBEAT_KEY, $workerId);
129
        return $this->extractHeartbeatDateTime($heartbeat);
130
    }
131
132
    public function getWorkerHeartbeats(): array
133
    {
134
        $heartbeats = $this->redis->hgetall(DataStore::HEARTBEAT_KEY);
135
        return array_map($heartbeats, [$this, 'extractHeartbeatDateTime']);
136
    }
137
138
    public function extractHeartbeatDateTime($heartbeat): DateTime
139
    {
140
        if (!empty($heartbeat)) {
141
            if (false !== ($time = DateTime::createFromFormat(DataStore::REDIS_DATE_FORMAT, $heartbeat))) {
142
                return $time;
143
            }
144
        }
145
        throw new RuntimeException("Invalid Worker Heartbeat");
146
    }
147
148
    public function acquirePruningDeadWorkerLock(string $workerId, int $expiry): void
149
    {
150
        $this->redis->set(self::REDIS_KEY_FOR_WORKER_PRUNING, 'EX', $expiry, 'NX');
151
    }
152
153
    public function workerStarted(string $workerId): void
154
    {
155
        $startTime = (new DateTime())->format(self::REDIS_DATE_FORMAT);
156
        $this->redis->set($this->redisKeyForWorkerStartTime($workerId), $startTime);
157
    }
158
159
    public function redisKeyForWorker(string $workerId): string
160
    {
161
        return "worker:{$workerId}";
162
    }
163
164
    public function redisKeyForWorkerStartTime(string $workerId): string
165
    {
166
        return "{$this->redisKeyForWorker($workerId)}:started";
167
    }
168
169
    public function setWorkerPayload(string $workerId, string $data): void
170
    {
171
        $this->redis->set($this->redisKeyForWorker($workerId), $data);
172
    }
173
174
    public function getWorkerStartTime(string $workerId): DateTime
175
    {
176
        $workerStartRedisTime = $this->redis->get($this->redisKeyForWorkerStartTime($workerId));
177
        if (!empty($workerStartRedisTime)) {
178
            if (false !== ($workerStartTime = DateTime::createFromFormat(DataStore::REDIS_DATE_FORMAT, $workerStartRedisTime))) {
179
                return $workerStartTime;
180
            }
181
        }
182
        throw new RuntimeException("Invalid Worker Start Time");
183
    }
184
185
    public function workerDoneWorking(string $workerId, callable $block): void
186
    {
187
        $redis = $this->redis;
188
        $workerKey = $this->redisKeyForWorker($workerId);
189
        $this->redis->pipeline(function () use ($redis, $workerKey, $block) {
190
            $redis->del($workerKey);
191
            $block();
192
        });
193
    }
194
195
    public function getStat(string $statName): int
196
    {
197
        return $this->redis($this->redisKeyForStats($statName));
198
    }
199
200
    public function incrementStat(string $statName, int $by): void
201
    {
202
        $this->redis->incrby($this->redisKeyForStats($statName), $by);
203
    }
204
205
    public function decrementStat(string $statName, int $by): void
206
    {
207
        $this->redis->decrby($this->redisKeyForStats($statName), $by);
208
    }
209
210
    public function crearStat(string $statName): void
211
    {
212
        $this->redis->del($this->redisKeyForStats($statName));
213
    }
214
215
    public function redisKeyForStats(string $statName): string
216
    {
217
        return "stats:{$statName}";
218
    }
219
220
    public function reconnect(): void
221
    {
222
        $this->redis->disconnect();
223
        $this->redis->connect();
224
    }
225
}
226