Completed
Pull Request — master (#46)
by Michal
12:30
created

RedisSetDriver::pop()   A

Complexity

Conditions 3
Paths 2

Size

Total Lines 9

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 12

Importance

Changes 0
Metric Value
dl 0
loc 9
ccs 0
cts 0
cp 0
rs 9.9666
c 0
b 0
f 0
cc 3
nc 2
nop 1
crap 12
1
<?php
2
declare(strict_types=1);
3
4
namespace Tomaj\Hermes\Driver;
5
6
use Redis;
7
use Closure;
8
use Tomaj\Hermes\Dispatcher;
9
use Tomaj\Hermes\MessageInterface;
10
use Tomaj\Hermes\MessageSerializer;
11
use Tomaj\Hermes\Shutdown\ShutdownException;
12
use Tomaj\Hermes\SerializeException;
13
14
class RedisSetDriver implements DriverInterface
15
{
16
    use MaxItemsTrait;
17
    use ShutdownTrait;
18
    use SerializerAwareTrait;
19
20
    /** @var array<int, string>  */
21
    private array $queues = [];
0 ignored issues
show
Bug introduced by
This code did not parse for me. Apparently, there is an error somewhere around this line:

Syntax error, unexpected T_ARRAY, expecting T_FUNCTION or T_CONST
Loading history...
22
23
    /**
24
     * @var string
25
     */
26
    private string $scheduleKey;
27
28
    /**
29
     * @var Redis
30
     */
31
    private $redis;
32
33
    /**
34
     * @var integer
35
     */
36
    private $refreshInterval;
37
38
    /**
39
     * Create new RedisSetDriver
40
     *
41
     * This driver is using redis set. With send message it add new item to set
42
     * and in wait() command it is reading new items in this set.
43
     * This driver doesn't use redis pubsub functionality, only redis sets.
44
     *
45
     * Managing connection to redis is up to you and you have to create it outsite
46
     * of this class. You have to have enabled native Redis php extension.
47 21
     *
48
     * @see examples/redis
49 21
     *
50 3
     * @param Redis                  $redis
51
     * @param string                 $key
52
     * @param integer                $refreshInterval
53 18
     * @param string                 $scheduleKey
54 18
     */
55 18
    public function __construct(Redis $redis, string $key = 'hermes', int $refreshInterval = 1, string $scheduleKey = 'hermes_schedule')
56 18
    {
57 18
        $this->setupPriorityQueue($key, Dispatcher::DEFAULT_PRIORITY);
58
59
        $this->scheduleKey = $scheduleKey;
60
        $this->redis = $redis;
61
        $this->refreshInterval = $refreshInterval;
62 6
        $this->serializer = new MessageSerializer();
63
    }
64 6
65 6
    /**
66
     * {@inheritdoc}
67
     *
68
     * @throws UnknownPriorityException
69
     */
70 6
    public function send(MessageInterface $message, int $priority = Dispatcher::DEFAULT_PRIORITY): bool
71
    {
72 6
        if ($message->getExecuteAt() !== null && $message->getExecuteAt() > microtime(true)) {
73 6
            $this->redis->zAdd($this->scheduleKey, $message->getExecuteAt(), $this->serializer->serialize($message));
74 6
        } else {
75
            $key = $this->getKey($priority);
76 6
            $this->redis->sAdd($key, $this->serializer->serialize($message));
77 6
        }
78
        return true;
79 6
    }
80 3
81 2
    /**
82 6
     * {@inheritdoc}
83 3
     */
84 2
    public function setupPriorityQueue(string $name, int $priority): void
85
    {
86 6
        $this->queues[$priority] = $name;
87 6
        ksort($this->queues, SORT_ASC | SORT_NUMERIC);
88
    }
89
90 6
    /**
91 6
     * @param int $priority
92 4
     * @return string
93
     *
94 6
     * @throws UnknownPriorityException
95
     */
96
    private function getKey(int $priority): string
97 4
    {
98 6
        if (!isset($this->queues[$priority])) {
99
            throw new UnknownPriorityException("Unknown priority {$priority}");
100
        }
101
        return $this->queues[$priority];
102
    }
103
104
    /**
105
     * {@inheritdoc}
106
     *
107
     * @throws ShutdownException
108
     * @throws UnknownPriorityException
109
     * @throws SerializeException
110
     */
111
    public function wait(Closure $callback, array $priorities = []): void
112
    {
113
        $queues = array_reverse($this->queues, true);
114
        while (true) {
115
            $this->checkShutdown();
116
            if (!$this->shouldProcessNext()) {
117
                break;
118
            }
119
120
            // check schedule
121
            $messagesString = $this->redis->zRangeByScore($this->scheduleKey, '-inf', (string)microtime(true), ['limit' => [0, 1]]);
122
            if (count($messagesString)) {
123
                foreach ($messagesString as $messageString) {
124
                    $this->redis->zRem($this->scheduleKey, $messageString);
125
                    $this->send($this->serializer->unserialize($messageString));
126
                }
127
            }
128
129
            $messageString = null;
130
            $foundPriority = null;
131
132
            foreach ($queues as $priority => $name) {
133
                if (count($priorities) > 0 && !in_array($priority, $priorities)) {
134
                    continue;
135
                }
136
                if ($messageString !== null) {
137
                    break;
138
                }
139
140
                $messageString = $this->pop($this->getKey($priority));
141
                $foundPriority = $priority;
142
            }
143
144
            if ($messageString !== null) {
145
                $message = $this->serializer->unserialize($messageString);
146
                $callback($message, $foundPriority);
147
                $this->incrementProcessedItems();
148
            } else {
149
                if ($this->refreshInterval) {
150
                    $this->checkShutdown();
151
                    sleep($this->refreshInterval);
152
                }
153
            }
154
        }
155
    }
156
157
    private function pop(string $key): ?string
158
    {
159
        $messageString = $this->redis->sPop($key);
160
        if (is_string($messageString) && $messageString !== "") {
161
            return $messageString;
162
        }
163
164
        return null;
165
    }
166
}
167