Completed
Push — master ( 195654...bc8a63 )
by Tomas
39:26 queued 24:30
created

PredisSetDriver::__construct()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 9

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 9
rs 9.9666
c 0
b 0
f 0
cc 1
nc 1
nop 4
1
<?php
2
declare(strict_types=1);
3
4
namespace Tomaj\Hermes\Driver;
5
6
use Closure;
7
use Predis\Client;
8
use Tomaj\Hermes\Dispatcher;
9
use Tomaj\Hermes\MessageInterface;
10
use Tomaj\Hermes\MessageSerializer;
11
use Tomaj\Hermes\Shutdown\ShutdownException;
12
13
class PredisSetDriver implements DriverInterface
14
{
15
    use MaxItemsTrait;
16
    use ShutdownTrait;
17
    use SerializerAwareTrait;
18
19
    private $queues = [];
20
21
    /**
22
     * @var string
23
     */
24
    private $scheduleKey;
25
26
    /**
27
     * @var Client
28
     */
29
    private $redis;
30
31
    /**
32
     * @var integer
33
     */
34
    private $refreshInterval;
35
36
    /**
37
     * Create new PredisSetDriver
38
     *
39
     * This driver is using redis set. With send message it add new item to set
40
     * and in wait() command it is reading new items in this set.
41
     * This driver doesn't use redis pubsub functionality, only redis sets.
42
     *
43
     * Managing connection to redis is up to you and you have to create it outsite
44
     * of this class. You will need to install predis php package.
45
     *
46
     * @see examples/redis
47
     *
48
     * @param Client                 $redis
49
     * @param string                 $key
50
     * @param integer                $refreshInterval
51
     * @param string                 $scheduleKey
52
     */
53
    public function __construct(Client $redis, string $key = 'hermes', int $refreshInterval = 1, string $scheduleKey = 'hermes_schedule')
54
    {
55
        $this->setupPriorityQueue($key, Dispatcher::PRIORITY_MEDIUM);
56
57
        $this->scheduleKey = $scheduleKey;
58
        $this->redis = $redis;
59
        $this->refreshInterval = $refreshInterval;
60
        $this->serializer = new MessageSerializer();
61
    }
62
63
    /**
64
     * {@inheritdoc}
65
     */
66
    public function send(MessageInterface $message, int $priority = Dispatcher::PRIORITY_MEDIUM): bool
67
    {
68
        if ($message->getExecuteAt() && $message->getExecuteAt() > microtime(true)) {
69
            $this->redis->zadd($this->scheduleKey, [$message->getExecuteAt(), $this->serializer->serialize($message)]);
70
        } else {
71
            $key = $this->getKey($priority);
72
            $this->redis->sadd($key, $this->serializer->serialize($message));
73
        }
74
        return true;
75
    }
76
77
    public function setupPriorityQueue(string $name, int $priority): void
78
    {
79
        $this->queues[$priority] = $name;
80
        ksort($this->queues, SORT_ASC | SORT_NUMERIC);
81
    }
82
83
    private function getKey(int $priority): string
84
    {
85
        if (!isset($this->queues[$priority])) {
86
            throw new \Exception("Unknown priority {$priority}");
87
        }
88
        return $this->queues[$priority];
89
    }
90
91
    /**s
92
     * {@inheritdoc}
93
     *
94
     * @throws ShutdownException
95
     */
96
    public function wait(Closure $callback, array $priorities = []): void
97
    {
98
        $queues = array_reverse($this->queues, true);
99
        while (true) {
100
            $this->checkShutdown();
101
            if (!$this->shouldProcessNext()) {
102
                break;
103
            }
104
105
            // check schedule
106
            $messagesString = $this->redis->zrangebyscore($this->scheduleKey, '-inf', microtime(true), ['LIMIT' => ['OFFSET' => 0, 'COUNT' => 1]]);
107
            if (count($messagesString)) {
108
                foreach ($messagesString as $messageString) {
109
                    $this->redis->zrem($this->scheduleKey, $messageString);
110
                    $this->send($this->serializer->unserialize($messageString));
111
                }
112
            }
113
114
            $messageString = null;
115
            $foundPriority = null;
116
117
            foreach ($queues as $priority => $name) {
118
                if (count($priorities) > 0 && !in_array($priority, $priorities)) {
119
                    continue;
120
                }
121
                if ($messageString !== null) {
122
                    break;
123
                }
124
125
                $key = $this->getKey($priority);
126
127
                $messageString = $this->redis->spop($key);
128
                $foundPriority = $priority;
129
            }
130
131
            if ($messageString !== null) {
132
                $message = $this->serializer->unserialize($messageString);
133
                $callback($message, $foundPriority);
134
                $this->incrementProcessedItems();
135
            } else {
136
                if ($this->refreshInterval) {
137
                    $this->checkShutdown();
138
                    sleep($this->refreshInterval);
139
                }
140
            }
141
        }
142
    }
143
}
144