Passed
Push — master ( 307b62...973735 )
by Dirk
02:53
created

DataStore::removeWorkerHeartbeat()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 0
Metric Value
eloc 1
dl 0
loc 3
ccs 2
cts 2
cp 1
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 1
crap 1
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Resque;
6
7
use Predis\Client;
8
use Resque\Dispatchers\Noop;
9
use Resque\Interfaces\Dispatcher;
10
use Resque\Tasks\BeforeJobPop;
11
use Resque\Tasks\BeforeJobPush;
12
13
class DataStore
14
{
15
    public const REDIS_DATE_FORMAT = 'Y-m-d H:i:s O';
16
17
    private $redis;
18
    private $dispatcher;
19
20 8
    public function __construct(Client $client)
21
    {
22 8
        $this->redis = $client;
23 8
        $this->dispatcher = new Noop();
24 8
    }
25
26 8
    public function setDispatcher(Dispatcher $dispatcher): void
27
    {
28 8
        $this->dispatcher = $dispatcher;
29 8
    }
30
31 2
    public function pushToQueue(string $queueName, string $json): void
32
    {
33 2
        $queueKey = $this->redisKeyForQueue($queueName);
34
        $payload = [
35 2
            'queue_key' => $queueKey,
36 2
            'queue_name' => $queueName,
37 2
            'json' => $json,
38 2
            'command' => 'rpush',
39
        ];
40 2
        $payload = $this->dispatcher->dispatch(BeforeJobPush::class, $payload);
41 2
        $command = $payload['command'];
42 2
        $this->redis->sadd('queues', $payload['queue_name']);
43 2
        $this->redis->$command($payload['queue_key'], $payload['json']);
44 2
    }
45
46 1
    public function popFromQueue(string $queueName): string
47
    {
48 1
        $payload = ['command' => 'lpop', 'queue_name' => $queueName];
49 1
        $payload = $this->dispatcher->dispatch(BeforeJobPop::class, $payload);
50 1
        $command = $payload['command'];
51 1
        return $this->redis->$command($this->redisKeyForQueue($payload['queue_name']));
52
    }
53
54 3
    public function redisKeyForQueue(string $queueName): string
55
    {
56 3
        return "queue:{$queueName}";
57
    }
58
59 1
    public function pushToFailedQueue(string $json): void
60
    {
61 1
        $this->pushToQueue('failed', $json);
62 1
    }
63
64 1
    public function registerWorker(string $workerId): void
65
    {
66 1
        $this->redis->sadd("workers", $workerId);
67 1
        $this->workerStarted($workerId);
68 1
    }
69
70 1
    public function unregisterWorker(string $workerId): void
71
    {
72 1
        $this->redis->srem("workers", $workerId);
73 1
        $this->redis->del($this->redisKeyForWorker($workerId));
74 1
        $this->redis->del($this->redisKeyForWorkerStartTime($workerId));
75 1
    }
76
77 1
    public function workerStarted(string $workerId): void
78
    {
79 1
        $startTime = (new \DateTime())->format(self::REDIS_DATE_FORMAT);
80 1
        $this->redis->set($this->redisKeyForWorkerStartTime($workerId), $startTime);
81 1
    }
82
83 4
    public function redisKeyForWorker(string $workerId): string
84
    {
85 4
        return "worker:{$workerId}";
86
    }
87
88 2
    public function redisKeyForWorkerStartTime(string $workerId): string
89
    {
90 2
        return "{$this->redisKeyForWorker($workerId)}:started";
91
    }
92
93 1
    public function setWorkerPayload(string $workerId, string $data): void
94
    {
95 1
        $this->redis->set($this->redisKeyForWorker($workerId), $data);
96 1
    }
97
98 1
    public function workerDoneWorking(string $workerId): void
99
    {
100 1
        $this->redis->del($this->redisKeyForWorker($workerId));
101 1
    }
102
103 1
    public function reconnect(): void
104
    {
105 1
        $this->redis->disconnect();
106 1
        $this->redis->connect();
107 1
    }
108
}
109