1 | <?php |
||
14 | class RedisSetDriver implements DriverInterface |
||
15 | { |
||
16 | use MaxItemsTrait; |
||
17 | use ShutdownTrait; |
||
18 | use SerializerAwareTrait; |
||
19 | |||
20 | /** @var array<int, string> */ |
||
21 | private array $queues = []; |
||
|
|||
22 | |||
23 | /** |
||
24 | * @var string |
||
25 | */ |
||
26 | private string $scheduleKey; |
||
27 | |||
28 | /** |
||
29 | * @var Redis |
||
30 | */ |
||
31 | private $redis; |
||
32 | |||
33 | /** |
||
34 | * @var integer |
||
35 | */ |
||
36 | private $refreshInterval; |
||
37 | |||
38 | /** |
||
39 | * Create new RedisSetDriver |
||
40 | * |
||
41 | * This driver is using redis set. With send message it add new item to set |
||
42 | * and in wait() command it is reading new items in this set. |
||
43 | * This driver doesn't use redis pubsub functionality, only redis sets. |
||
44 | * |
||
45 | * Managing connection to redis is up to you and you have to create it outsite |
||
46 | * of this class. You have to have enabled native Redis php extension. |
||
47 | 21 | * |
|
48 | * @see examples/redis |
||
49 | 21 | * |
|
50 | 3 | * @param Redis $redis |
|
51 | * @param string $key |
||
52 | * @param integer $refreshInterval |
||
53 | 18 | * @param string $scheduleKey |
|
54 | 18 | */ |
|
55 | 18 | public function __construct(Redis $redis, string $key = 'hermes', int $refreshInterval = 1, string $scheduleKey = 'hermes_schedule') |
|
56 | 18 | { |
|
57 | 18 | $this->setupPriorityQueue($key, Dispatcher::DEFAULT_PRIORITY); |
|
58 | |||
59 | $this->scheduleKey = $scheduleKey; |
||
60 | $this->redis = $redis; |
||
61 | $this->refreshInterval = $refreshInterval; |
||
62 | 6 | $this->serializer = new MessageSerializer(); |
|
63 | } |
||
64 | 6 | ||
65 | 6 | /** |
|
66 | * {@inheritdoc} |
||
67 | * |
||
68 | * @throws UnknownPriorityException |
||
69 | */ |
||
70 | 6 | public function send(MessageInterface $message, int $priority = Dispatcher::DEFAULT_PRIORITY): bool |
|
71 | { |
||
72 | 6 | if ($message->getExecuteAt() !== null && $message->getExecuteAt() > microtime(true)) { |
|
73 | 6 | $this->redis->zAdd($this->scheduleKey, $message->getExecuteAt(), $this->serializer->serialize($message)); |
|
74 | 6 | } else { |
|
75 | $key = $this->getKey($priority); |
||
76 | 6 | $this->redis->sAdd($key, $this->serializer->serialize($message)); |
|
77 | 6 | } |
|
78 | return true; |
||
79 | 6 | } |
|
80 | 3 | ||
81 | 2 | /** |
|
82 | 6 | * {@inheritdoc} |
|
83 | 3 | */ |
|
84 | 2 | public function setupPriorityQueue(string $name, int $priority): void |
|
85 | { |
||
86 | 6 | $this->queues[$priority] = $name; |
|
87 | 6 | ksort($this->queues, SORT_ASC | SORT_NUMERIC); |
|
88 | } |
||
89 | |||
90 | 6 | /** |
|
91 | 6 | * @param int $priority |
|
92 | 4 | * @return string |
|
93 | * |
||
94 | 6 | * @throws UnknownPriorityException |
|
95 | */ |
||
96 | private function getKey(int $priority): string |
||
97 | 4 | { |
|
98 | 6 | if (!isset($this->queues[$priority])) { |
|
99 | throw new UnknownPriorityException("Unknown priority {$priority}"); |
||
100 | } |
||
101 | return $this->queues[$priority]; |
||
102 | } |
||
103 | |||
104 | /** |
||
105 | * {@inheritdoc} |
||
106 | * |
||
107 | * @throws ShutdownException |
||
108 | * @throws UnknownPriorityException |
||
109 | * @throws SerializeException |
||
110 | */ |
||
111 | public function wait(Closure $callback, array $priorities = []): void |
||
112 | { |
||
113 | $queues = array_reverse($this->queues, true); |
||
114 | while (true) { |
||
115 | $this->checkShutdown(); |
||
116 | if (!$this->shouldProcessNext()) { |
||
117 | break; |
||
118 | } |
||
119 | |||
120 | // check schedule |
||
121 | $messagesString = $this->redis->zRangeByScore($this->scheduleKey, '-inf', (string)microtime(true), ['limit' => [0, 1]]); |
||
122 | if (count($messagesString)) { |
||
123 | foreach ($messagesString as $messageString) { |
||
124 | $this->redis->zRem($this->scheduleKey, $messageString); |
||
125 | $this->send($this->serializer->unserialize($messageString)); |
||
126 | } |
||
127 | } |
||
128 | |||
129 | $messageString = null; |
||
130 | $foundPriority = null; |
||
131 | |||
132 | foreach ($queues as $priority => $name) { |
||
133 | if (count($priorities) > 0 && !in_array($priority, $priorities)) { |
||
134 | continue; |
||
135 | } |
||
136 | if ($messageString !== null) { |
||
137 | break; |
||
138 | } |
||
139 | |||
140 | $messageString = $this->pop($this->getKey($priority)); |
||
141 | $foundPriority = $priority; |
||
142 | } |
||
143 | |||
144 | if ($messageString !== null) { |
||
145 | $message = $this->serializer->unserialize($messageString); |
||
146 | $callback($message, $foundPriority); |
||
147 | $this->incrementProcessedItems(); |
||
148 | } else { |
||
149 | if ($this->refreshInterval) { |
||
150 | $this->checkShutdown(); |
||
151 | sleep($this->refreshInterval); |
||
152 | } |
||
153 | } |
||
154 | } |
||
155 | } |
||
156 | |||
157 | private function pop(string $key): ?string |
||
158 | { |
||
159 | $messageString = $this->redis->sPop($key); |
||
160 | if (is_string($messageString) && $messageString !== "") { |
||
161 | return $messageString; |
||
162 | } |
||
163 | |||
164 | return null; |
||
165 | } |
||
166 | } |
||
167 |