Passed
Push — master ( 4917b9...93c409 )
by Dirk
02:39
created

DataStore::pushToFailedQueue()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 0
Metric Value
eloc 1
dl 0
loc 3
ccs 2
cts 2
cp 1
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 1
crap 1
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Resque;
6
7
use Predis\Client;
8
use Resque\Dispatchers\Noop;
9
use Resque\Interfaces\DispatcherInterface;
10
use Resque\Tasks\BeforeJobPop;
11
use Resque\Tasks\BeforeJobPush;
12
13
class DataStore
14
{
15
    public const REDIS_DATE_FORMAT = 'Y-m-d H:i:s O';
16
    public const REDIS_KEY_FOR_WORKER_PRUNING = "pruning_dead_workers_in_progress";
17
    public const REDIS_HEARTBEAT_KEY = "workers:heartbeat";
18
19
    private $redis;
20
    private $dispatcher;
21
22 3
    public function __construct(Client $client)
23
    {
24 3
        $this->redis = $client;
25 3
        $this->dispatcher = new Noop();
26 3
    }
27
28 3
    public function setDispatcher(DispatcherInterface $dispatcher): void
29
    {
30 3
        $this->dispatcher = $dispatcher;
31 3
    }
32
33
    /*
34
    public function allResqueKeys(): array
35
    {
36
        return array_map(
37
            $this->redis->keys('*'),
38
            function ($key) {
39
                return str_replace("{$this->namespace}:", '');
40
            }
41
        );
42
    }
43
    */
44
45 2
    public function pushToQueue(string $queueName, string $json): void
46
    {
47 2
        $queueKey = $this->redisKeyForQueue($queueName);
48
        $payload = [
49 2
            'queue_key' => $queueKey,
50 2
            'queue_name' => $queueName,
51 2
            'json' => $json,
52 2
            'command' => 'rpush',
53
        ];
54 2
        $payload = $this->dispatcher->dispatch(BeforeJobPush::class, $payload);
55 2
        $command = $payload['command'];
56 2
        $this->redis->sadd('queues', $payload['queue_name']);
57 2
        $this->redis->$command($payload['queue_key'], $payload['json']);
58 2
    }
59
60 1
    public function popFromQueue(string $queueName): string
61
    {
62 1
        $payload = ['command' => 'lpop', 'queue_name' => $queueName];
63 1
        $payload = $this->dispatcher->dispatch(BeforeJobPop::class, $payload);
64 1
        $command = $payload['command'];
65 1
        return $this->redis->$command($this->redisKeyForQueue($payload['queue_name']));
66
    }
67
68
    /*
69
    public function queueSize(string $queueName): int
70
    {
71
        return intval($this->redis->llen($this->redisKeyForQueue($queueName)));
72
    }
73
    */
74
75 3
    public function redisKeyForQueue(string $queueName): string
76
    {
77 3
        return "queue:{$queueName}";
78
    }
79
80 1
    public function pushToFailedQueue(string $json): void
81
    {
82 1
        $this->pushToQueue('failed', $json);
83 1
    }
84
85
    /*
86
    public function getWorkerPayload(string $workerId): string
87
    {
88
        return $this->redis->get($this->redisKeyForWorker($workerId));
89
    }
90
    */
91
92
    /*
93
    public function workerExists(string $workerId): bool
94
    {
95
        return $this->redis->sismember("workers", $workerId);
96
    }
97
    */
98
99
    public function registerWorker(string $workerId): void
100
    {
101
        $redis = $this->redis;
102
        $that = $this;
103
        $this->redis->pipeline(function () use ($redis, $workerId, $that) {
104
            $redis->sadd("workers", $workerId);
105
            $that->workerStarted($workerId);
106
        });
107
    }
108
109
    public function unregisterWorker(string $workerId): void
110
    {
111
        $redis = $this->redis;
112
        $that = $this;
113
        $redis->pipeline(function () use ($redis, $workerId, $that) {
114
            $redis->srem("workers", $workerId);
115
            $redis->del($that->redisKeyForWorker($workerId));
116
            $redis->del($that->redisKeyForWorkerStartTime($workerId));
117
            $that->removeWorkerHeartbeat($workerId);
118
        });
119
    }
120
121
    public function removeWorkerHeartbeat(string $workerId): void
122
    {
123
        $redis->hdel(DataStore::HEARTBEAT_KEY, $workerId);
124
    }
125
126
    /*
127
    public function hasWorkerHeartbeat(string $workerId): bool
128
    {
129
        $heartbeat = $this->redis->hget(DataStore::HEARTBEAT_KEY, $workerId);
130
        return !empty($heartbeat) && DateTime::createFromFormat(DataStore::REDIS_DATE_FORMAT, $heartbeat);
131
    }
132
    */
133
134
    /*
135
    public function getWorkerHeartbeat(string $workerId): DateTime
136
    {
137
        $heartbeat = $this->redis->hget(DataStore::HEARTBEAT_KEY, $workerId);
138
        return $this->extractHeartbeatDateTime($heartbeat);
139
    }
140
    */
141
142
    /*
143
    public function getWorkerHeartbeats(): array
144
    {
145
        $heartbeats = $this->redis->hgetall(DataStore::HEARTBEAT_KEY);
146
        return array_map($heartbeats, [$this, 'extractHeartbeatDateTime']);
147
    }
148
    */
149
150
    /*
151
    public function extractHeartbeatDateTime($heartbeat): DateTime
152
    {
153
        if (!empty($heartbeat)) {
154
            if (false !== ($time = DateTime::createFromFormat(DataStore::REDIS_DATE_FORMAT, $heartbeat))) {
155
                return $time;
156
            }
157
        }
158
        throw new RuntimeException("Invalid Worker Heartbeat");
159
    }
160
    */
161
162
    /*
163
    public function acquirePruningDeadWorkerLock(string $workerId, int $expiry): void
164
    {
165
        $this->redis->set(self::REDIS_KEY_FOR_WORKER_PRUNING, 'EX', $expiry, 'NX');
166
    }
167
    */
168
169
    public function workerStarted(string $workerId): void
170
    {
171
        $startTime = (new DateTime())->format(self::REDIS_DATE_FORMAT);
172
        $this->redis->set($this->redisKeyForWorkerStartTime($workerId), $startTime);
173
    }
174
175
    public function redisKeyForWorker(string $workerId): string
176
    {
177
        return "worker:{$workerId}";
178
    }
179
180
    public function redisKeyForWorkerStartTime(string $workerId): string
181
    {
182
        return "{$this->redisKeyForWorker($workerId)}:started";
183
    }
184
185
    public function setWorkerPayload(string $workerId, string $data): void
186
    {
187
        $this->redis->set($this->redisKeyForWorker($workerId), $data);
188
    }
189
190
    public function getWorkerStartTime(string $workerId): DateTime
191
    {
192
        $workerStartRedisTime = $this->redis->get($this->redisKeyForWorkerStartTime($workerId));
193
        if (!empty($workerStartRedisTime)) {
194
            if (false !== ($workerStartTime = DateTime::createFromFormat(DataStore::REDIS_DATE_FORMAT, $workerStartRedisTime))) {
195
                return $workerStartTime;
196
            }
197
        }
198
        throw new RuntimeException("Invalid Worker Start Time");
199
    }
200
201
    public function workerDoneWorking(string $workerId, callable $block): void
202
    {
203
        $redis = $this->redis;
204
        $workerKey = $this->redisKeyForWorker($workerId);
205
        $this->redis->pipeline(function () use ($redis, $workerKey, $block) {
206
            $redis->del($workerKey);
207
            $block();
208
        });
209
    }
210
211
    /*
212
    public function getStat(string $statName): int
213
    {
214
        return $this->redis($this->redisKeyForStats($statName));
215
    }
216
    */
217
218
    /*
219
    public function incrementStat(string $statName, int $by): void
220
    {
221
        $this->redis->incrby($this->redisKeyForStats($statName), $by);
222
    }
223
    */
224
225
    /*
226
    public function decrementStat(string $statName, int $by): void
227
    {
228
        $this->redis->decrby($this->redisKeyForStats($statName), $by);
229
    }
230
    */
231
232
    /*
233
    public function crearStat(string $statName): void
234
    {
235
        $this->redis->del($this->redisKeyForStats($statName));
236
    }
237
    */
238
239
    /*
240
    public function redisKeyForStats(string $statName): string
241
    {
242
        return "stats:{$statName}";
243
    }
244
    */
245
246
    public function reconnect(): void
247
    {
248
        $this->redis->disconnect();
249
        $this->redis->connect();
250
    }
251
}
252