Passed
Push — master ( a72e9a...84b2ba )
by Dirk
03:05
created

DataStore::setNamespace()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 0
Metric Value
eloc 1
dl 0
loc 3
ccs 2
cts 2
cp 1
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 1
crap 1
1
<?php
2
3
declare(strict_types=1);
4
5
namespace Resque;
6
7
use Predis\Client;
8
use Resque\Dispatchers\Noop;
9
use Resque\Interfaces\Dispatcher;
10
use Resque\Tasks\BeforeJobPop;
11
use Resque\Tasks\BeforeJobPush;
12
13
class DataStore
14
{
15
    public const REDIS_DATE_FORMAT = 'Y-m-d H:i:s O';
16
17
    private $redis;
18
    private $dispatcher;
19
    private $namespace = 'resque';
20
21 9
    public function __construct(Client $client)
22
    {
23 9
        $this->redis = $client;
24 9
        $this->dispatcher = new Noop();
25 9
    }
26
27 9
    public function setDispatcher(Dispatcher $dispatcher): void
28
    {
29 9
        $this->dispatcher = $dispatcher;
30 9
    }
31
32 1
    public function setNamespace(string $namespace): void
33
    {
34 1
        $this->namespace = $namespace;
35 1
    }
36
37 2
    public function pushToQueue(string $queueName, string $json): void
38
    {
39 2
        $queueKey = $this->redisKeyForQueue($queueName);
40
        $payload = [
41 2
            'queue_key' => $queueKey,
42 2
            'queue_name' => $queueName,
43 2
            'json' => $json,
44 2
            'command' => 'rpush',
45
        ];
46 2
        $payload = $this->dispatcher->dispatch(BeforeJobPush::class, $payload);
47 2
        $command = $payload['command'];
48 2
        $this->redis->sadd("{$this->namespace}:queues", $payload['queue_name']);
49 2
        $this->redis->$command($payload['queue_key'], $payload['json']);
50 2
    }
51
52 1
    public function popFromQueue(string $queueName): string
53
    {
54 1
        $payload = ['command' => 'lpop', 'queue_name' => $queueName];
55 1
        $payload = $this->dispatcher->dispatch(BeforeJobPop::class, $payload);
56 1
        $command = $payload['command'];
57 1
        return $this->redis->$command($this->redisKeyForQueue($payload['queue_name']));
58
    }
59
60 3
    public function redisKeyForQueue(string $queueName): string
61
    {
62 3
        return "{$this->namespace}:queue:{$queueName}";
63
    }
64
65 1
    public function pushToFailedQueue(string $json): void
66
    {
67 1
        $this->pushToQueue('failed', $json);
68 1
    }
69
70 1
    public function registerWorker(string $workerId): void
71
    {
72 1
        $this->redis->sadd("{$this->namespace}:workers", $workerId);
73 1
        $this->workerStarted($workerId);
74 1
    }
75
76 1
    public function unregisterWorker(string $workerId): void
77
    {
78 1
        $this->redis->srem("{$this->namespace}:workers", $workerId);
79 1
        $this->redis->del($this->redisKeyForWorker($workerId));
80 1
        $this->redis->del($this->redisKeyForWorkerStartTime($workerId));
81 1
    }
82
83 1
    public function workerStarted(string $workerId): void
84
    {
85 1
        $startTime = (new \DateTime())->format(self::REDIS_DATE_FORMAT);
86 1
        $this->redis->set($this->redisKeyForWorkerStartTime($workerId), $startTime);
87 1
    }
88
89 5
    public function redisKeyForWorker(string $workerId): string
90
    {
91 5
        return "{$this->namespace}:worker:{$workerId}";
92
    }
93
94 2
    public function redisKeyForWorkerStartTime(string $workerId): string
95
    {
96 2
        return "{$this->redisKeyForWorker($workerId)}:started";
97
    }
98
99 1
    public function setWorkerPayload(string $workerId, string $data): void
100
    {
101 1
        $this->redis->set($this->redisKeyForWorker($workerId), $data);
102 1
    }
103
104 2
    public function workerDoneWorking(string $workerId): void
105
    {
106 2
        $this->redis->del($this->redisKeyForWorker($workerId));
107 2
    }
108
109 1
    public function reconnect(): void
110
    {
111 1
        $this->redis->disconnect();
112 1
        $this->redis->connect();
113 1
    }
114
}
115