1
|
|
|
<?php |
2
|
|
|
declare(strict_types=1); |
3
|
|
|
|
4
|
|
|
namespace Tomaj\Hermes\Driver; |
5
|
|
|
|
6
|
|
|
use Closure; |
7
|
|
|
use InvalidArgumentException; |
8
|
|
|
use Predis\Client; |
9
|
|
|
use Redis; |
10
|
|
|
use Tomaj\Hermes\Dispatcher; |
11
|
|
|
use Tomaj\Hermes\MessageInterface; |
12
|
|
|
use Tomaj\Hermes\MessageSerializer; |
13
|
|
|
use Tomaj\Hermes\Restart\RestartException; |
14
|
|
|
|
15
|
|
|
class RedisSetDriver implements DriverInterface |
16
|
|
|
{ |
17
|
|
|
use MaxItemsTrait; |
18
|
|
|
use RestartTrait; |
19
|
|
|
use SerializerAwareTrait; |
20
|
|
|
|
21
|
|
|
private $queues = []; |
22
|
|
|
|
23
|
|
|
/** |
24
|
|
|
* @var string |
25
|
|
|
*/ |
26
|
|
|
private $scheduleKey; |
27
|
|
|
|
28
|
|
|
/** |
29
|
|
|
* @var Redis|Client |
30
|
|
|
*/ |
31
|
|
|
private $redis; |
32
|
|
|
|
33
|
|
|
/** |
34
|
|
|
* @var integer |
35
|
|
|
*/ |
36
|
|
|
private $refreshInterval; |
37
|
|
|
|
38
|
|
|
/** |
39
|
|
|
* Create new RedisSetDriver |
40
|
|
|
* |
41
|
|
|
* This driver is using redis set. With send message it add new item to set |
42
|
|
|
* and in wait() command it is reading new items in this set. |
43
|
|
|
* This driver doesn't use redis pubsub functionality, only redis sets. |
44
|
|
|
* |
45
|
|
|
* Managing connection to redis is up to you and you have to create it outsite |
46
|
|
|
* of this class. You can use native Redis php extension or Predis extension. |
47
|
21 |
|
* |
48
|
|
|
* @see examples/redis |
49
|
21 |
|
* |
50
|
3 |
|
* @param Redis|Client $redis |
51
|
|
|
* @param string $key |
52
|
|
|
* @param integer $refreshInterval |
53
|
18 |
|
* @param string $scheduleKey |
54
|
18 |
|
*/ |
55
|
18 |
|
public function __construct($redis, string $key = 'hermes', int $refreshInterval = 1, string $scheduleKey = 'hermes_schedule') |
56
|
18 |
|
{ |
57
|
18 |
|
if (!(($redis instanceof Client) || ($redis instanceof Redis))) { |
|
|
|
|
58
|
|
|
throw new InvalidArgumentException('Predis\Client or Redis instance required'); |
59
|
|
|
} |
60
|
|
|
|
61
|
|
|
$this->setupPriorityQueue($key, Dispatcher::PRIORITY_MEDIUM); |
62
|
6 |
|
|
63
|
|
|
$this->scheduleKey = $scheduleKey; |
64
|
6 |
|
$this->redis = $redis; |
65
|
6 |
|
$this->refreshInterval = $refreshInterval; |
66
|
|
|
$this->serializer = new MessageSerializer(); |
67
|
|
|
} |
68
|
|
|
|
69
|
|
|
/** |
70
|
6 |
|
* {@inheritdoc} |
71
|
|
|
*/ |
72
|
6 |
|
public function send(MessageInterface $message, int $priority = Dispatcher::PRIORITY_MEDIUM): bool |
73
|
6 |
|
{ |
74
|
6 |
|
if ($message->getExecuteAt() && $message->getExecuteAt() > microtime(true)) { |
75
|
|
|
$this->redis->zadd($this->scheduleKey, [$message->getExecuteAt(), $this->serializer->serialize($message)]); |
76
|
6 |
|
} else { |
77
|
6 |
|
$key = $this->getKey($priority); |
78
|
|
|
$this->redis->sadd($key, $this->serializer->serialize($message)); |
79
|
6 |
|
} |
80
|
3 |
|
return true; |
81
|
2 |
|
} |
82
|
6 |
|
|
83
|
3 |
|
public function setupPriorityQueue(string $name, int $priority): void |
84
|
2 |
|
{ |
85
|
|
|
$this->queues[$priority] = $name; |
86
|
6 |
|
ksort($this->queues, SORT_ASC | SORT_NUMERIC); |
87
|
6 |
|
} |
88
|
|
|
|
89
|
|
|
private function getKey(int $priority): string |
90
|
6 |
|
{ |
91
|
6 |
|
if (!isset($this->queues[$priority])) { |
92
|
4 |
|
throw new \Exception("Unknown priority {$priority}"); |
93
|
|
|
} |
94
|
6 |
|
return $this->queues[$priority]; |
95
|
|
|
} |
96
|
|
|
|
97
|
4 |
|
/**s |
98
|
6 |
|
* {@inheritdoc} |
99
|
|
|
* |
100
|
|
|
* @throws RestartException |
101
|
|
|
*/ |
102
|
|
|
public function wait(Closure $callback, array $priorities = []): void |
103
|
|
|
{ |
104
|
|
|
$queues = array_reverse($this->queues, true); |
105
|
|
|
while (true) { |
106
|
|
|
$this->checkRestart(); |
107
|
|
|
if (!$this->shouldProcessNext()) { |
108
|
|
|
break; |
109
|
|
|
} |
110
|
|
|
|
111
|
|
|
// check schedule |
112
|
|
|
$messagesString = []; |
113
|
|
View Code Duplication |
if ($this->redis instanceof Client) { |
|
|
|
|
114
|
|
|
$messagesString = $this->redis->zrangebyscore($this->scheduleKey, '-inf', microtime(true), ['LIMIT' => ['OFFSET' => 0, 'COUNT' => 1]]); |
115
|
|
|
if (count($messagesString)) { |
116
|
|
|
foreach ($messagesString as $messageString) { |
117
|
|
|
$this->redis->zrem($this->scheduleKey, $messageString); |
118
|
|
|
} |
119
|
|
|
} |
120
|
|
|
} |
121
|
|
View Code Duplication |
if ($this->redis instanceof Redis) { |
|
|
|
|
122
|
|
|
$messagesString = $this->redis->zRangeByScore($this->scheduleKey, '-inf', (string)microtime(true), ['limit' => [0, 1]]); |
123
|
|
|
if (count($messagesString)) { |
124
|
|
|
foreach ($messagesString as $messageString) { |
125
|
|
|
$this->redis->zRem($this->scheduleKey, $messageString); |
126
|
|
|
} |
127
|
|
|
} |
128
|
|
|
} |
129
|
|
|
if (count($messagesString)) { |
130
|
|
|
foreach ($messagesString as $messageString) { |
131
|
|
|
$this->send($this->serializer->unserialize($messageString)); |
132
|
|
|
} |
133
|
|
|
} |
134
|
|
|
|
135
|
|
|
$messageString = false; |
136
|
|
|
$foundPriority = null; |
137
|
|
|
|
138
|
|
|
foreach ($queues as $priority => $name) { |
139
|
|
|
if (count($priorities) > 0 && !in_array($priority, $priorities)) { |
140
|
|
|
continue; |
141
|
|
|
} |
142
|
|
|
if ($messageString) { |
143
|
|
|
break; |
144
|
|
|
} |
145
|
|
|
|
146
|
|
|
$key = $this->getKey($priority); |
147
|
|
|
|
148
|
|
|
if ($this->redis instanceof Client) { |
|
|
|
|
149
|
|
|
$messageString = $this->redis->spop($key); |
150
|
|
|
$foundPriority = $priority; |
151
|
|
|
} |
152
|
|
|
if ($this->redis instanceof Redis) { |
153
|
|
|
$messageString = $this->redis->sPop($key); |
154
|
|
|
$foundPriority = $priority; |
155
|
|
|
} |
156
|
|
|
} |
157
|
|
|
|
158
|
|
|
if ($messageString) { |
159
|
|
|
$message = $this->serializer->unserialize($messageString); |
160
|
|
|
$callback($message, $foundPriority); |
161
|
|
|
$this->incrementProcessedItems(); |
162
|
|
|
} else { |
163
|
|
|
if ($this->refreshInterval) { |
164
|
|
|
$this->checkRestart(); |
165
|
|
|
sleep($this->refreshInterval); |
166
|
|
|
} |
167
|
|
|
} |
168
|
|
|
} |
169
|
|
|
} |
170
|
|
|
} |
171
|
|
|
|
This error could be the result of:
1. Missing dependencies
PHP Analyzer uses your
composer.json
file (if available) to determine the dependencies of your project and to determine all the available classes and functions. It expects thecomposer.json
to be in the root folder of your repository.Are you sure this class is defined by one of your dependencies, or did you maybe not list a dependency in either the
require
orrequire-dev
section?2. Missing use statement
PHP does not complain about undefined classes in
ìnstanceof
checks. For example, the following PHP code will work perfectly fine:If you have not tested against this specific condition, such errors might go unnoticed.