Completed
Pull Request — master (#46)
by Michal
10:48
created

RedisSetDriver::getKey()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 2.5

Importance

Changes 0
Metric Value
dl 0
loc 7
ccs 2
cts 4
cp 0.5
rs 10
c 0
b 0
f 0
cc 2
nc 2
nop 1
crap 2.5
1
<?php
2
declare(strict_types=1);
3
4
namespace Tomaj\Hermes\Driver;
5
6
use Redis;
7
use Closure;
8
use Tomaj\Hermes\Dispatcher;
9
use Tomaj\Hermes\MessageInterface;
10
use Tomaj\Hermes\MessageSerializer;
11
use Tomaj\Hermes\Shutdown\ShutdownException;
12
use Tomaj\Hermes\SerializeException;
13
14
class RedisSetDriver implements DriverInterface
15
{
16
    use MaxItemsTrait;
17
    use ShutdownTrait;
18
    use SerializerAwareTrait;
19
20
    /** @var array<int, string>  */
21
    private array $queues = [];
0 ignored issues
show
Bug introduced by
This code did not parse for me. Apparently, there is an error somewhere around this line:

Syntax error, unexpected T_ARRAY, expecting T_FUNCTION or T_CONST
Loading history...
22
23
    /**
24
     * @var string
25
     */
26
    private string $scheduleKey;
27
28
    /**
29
     * @var Redis
30
     */
31
    private $redis;
32
33
    /**
34
     * @var integer
35
     */
36
    private $refreshInterval;
37
38
    /**
39
     * Create new RedisSetDriver
40
     *
41
     * This driver is using redis set. With send message it add new item to set
42
     * and in wait() command it is reading new items in this set.
43
     * This driver doesn't use redis pubsub functionality, only redis sets.
44
     *
45
     * Managing connection to redis is up to you and you have to create it outsite
46
     * of this class. You have to have enabled native Redis php extension.
47 21
     *
48
     * @see examples/redis
49 21
     *
50 3
     * @param Redis                  $redis
51
     * @param string                 $key
52
     * @param integer                $refreshInterval
53 18
     * @param string                 $scheduleKey
54 18
     */
55 18
    public function __construct(Redis $redis, string $key = 'hermes', int $refreshInterval = 1, string $scheduleKey = 'hermes_schedule')
56 18
    {
57 18
        $this->setupPriorityQueue($key, Dispatcher::DEFAULT_PRIORITY);
58
59
        $this->scheduleKey = $scheduleKey;
60
        $this->redis = $redis;
61
        $this->refreshInterval = $refreshInterval;
62 6
        $this->serializer = new MessageSerializer();
63
    }
64 6
65 6
    /**
66
     * {@inheritdoc}
67
     *
68
     * @throws UnknownPriorityException
69
     */
70 6
    public function send(MessageInterface $message, int $priority = Dispatcher::DEFAULT_PRIORITY): bool
71
    {
72 6
        if ($message->getExecuteAt() !== null && $message->getExecuteAt() > microtime(true)) {
73 6
            $this->redis->zAdd($this->scheduleKey, $message->getExecuteAt(), $this->serializer->serialize($message));
74 6
        } else {
75
            $key = $this->getKey($priority);
76 6
            $this->redis->sAdd($key, $this->serializer->serialize($message));
77 6
        }
78
        return true;
79 6
    }
80 3
81 2
    /**
82 6
     * {@inheritdoc}
83 3
     */
84 2
    public function setupPriorityQueue(string $name, int $priority): void
85
    {
86 6
        $this->queues[$priority] = $name;
87 6
    }
88
89
    /**
90 6
     * @param int $priority
91 6
     * @return string
92 4
     *
93
     * @throws UnknownPriorityException
94 6
     */
95
    private function getKey(int $priority): string
96
    {
97 4
        if (!isset($this->queues[$priority])) {
98 6
            throw new UnknownPriorityException("Unknown priority {$priority}");
99
        }
100
        return $this->queues[$priority];
101
    }
102
103
    /**
104
     * {@inheritdoc}
105
     *
106
     * @throws ShutdownException
107
     * @throws UnknownPriorityException
108
     * @throws SerializeException
109
     */
110
    public function wait(Closure $callback, array $priorities = []): void
111
    {
112
        $queues = array_reverse($this->queues, true);
113
        while (true) {
114
            $this->checkShutdown();
115
            if (!$this->shouldProcessNext()) {
116
                break;
117
            }
118
119
            // check schedule
120
            $messagesString = $this->redis->zRangeByScore($this->scheduleKey, '-inf', (string)microtime(true), ['limit' => [0, 1]]);
121
            if (count($messagesString)) {
122
                foreach ($messagesString as $messageString) {
123
                    $this->redis->zRem($this->scheduleKey, $messageString);
124
                    $this->send($this->serializer->unserialize($messageString));
125
                }
126
            }
127
128
            $messageString = null;
129
            $foundPriority = null;
130
131
            foreach ($queues as $priority => $name) {
132
                if (count($priorities) > 0 && !in_array($priority, $priorities)) {
133
                    continue;
134
                }
135
                if ($messageString !== null) {
136
                    break;
137
                }
138
139
                $messageString = $this->pop($this->getKey($priority));
140
                $foundPriority = $priority;
141
            }
142
143
            if ($messageString !== null) {
144
                $message = $this->serializer->unserialize($messageString);
145
                $callback($message, $foundPriority);
146
                $this->incrementProcessedItems();
147
            } else {
148
                if ($this->refreshInterval) {
149
                    $this->checkShutdown();
150
                    sleep($this->refreshInterval);
151
                }
152
            }
153
        }
154
    }
155
156
    private function pop(string $key): ?string
157
    {
158
        $messageString = $this->redis->sPop($key);
159
        if (is_string($messageString) && $messageString !== "") {
160
            return $messageString;
161
        }
162
163
        return null;
164
    }
165
}
166