Completed
Push — master ( 195654...bc8a63 )
by Tomas
39:26 queued 24:30
created

RedisSetDriver::wait()   B

Complexity

Conditions 11
Paths 26

Size

Total Lines 45

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 15.4801

Importance

Changes 0
Metric Value
dl 0
loc 45
rs 7.3166
c 0
b 0
f 0
ccs 2
cts 3
cp 0.6667
cc 11
nc 26
nop 2
crap 15.4801

How to fix   Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
declare(strict_types=1);
3
4
namespace Tomaj\Hermes\Driver;
5
6
use Redis;
7
use Closure;
8
use Tomaj\Hermes\Dispatcher;
9
use Tomaj\Hermes\MessageInterface;
10
use Tomaj\Hermes\MessageSerializer;
11
use Tomaj\Hermes\Shutdown\ShutdownException;
12
13
class RedisSetDriver implements DriverInterface
14
{
15
    use MaxItemsTrait;
16
    use ShutdownTrait;
17
    use SerializerAwareTrait;
18
19
    private $queues = [];
20
21
    /**
22
     * @var string
23
     */
24
    private $scheduleKey;
25
26
    /**
27
     * @var Redis
28
     */
29
    private $redis;
30
31
    /**
32
     * @var integer
33
     */
34
    private $refreshInterval;
35
36
    /**
37
     * Create new RedisSetDriver
38
     *
39
     * This driver is using redis set. With send message it add new item to set
40
     * and in wait() command it is reading new items in this set.
41
     * This driver doesn't use redis pubsub functionality, only redis sets.
42
     *
43
     * Managing connection to redis is up to you and you have to create it outsite
44
     * of this class. You have to have enabled native Redis php extension.
45
     *
46
     * @see examples/redis
47 21
     *
48
     * @param Redis                  $redis
49 21
     * @param string                 $key
50 3
     * @param integer                $refreshInterval
51
     * @param string                 $scheduleKey
52
     */
53 18
    public function __construct(Redis $redis, string $key = 'hermes', int $refreshInterval = 1, string $scheduleKey = 'hermes_schedule')
54 18
    {
55 18
        $this->setupPriorityQueue($key, Dispatcher::PRIORITY_MEDIUM);
56 18
57 18
        $this->scheduleKey = $scheduleKey;
58
        $this->redis = $redis;
59
        $this->refreshInterval = $refreshInterval;
60
        $this->serializer = new MessageSerializer();
61
    }
62 6
63
    /**
64 6
     * {@inheritdoc}
65 6
     */
66
    public function send(MessageInterface $message, int $priority = Dispatcher::PRIORITY_MEDIUM): bool
67
    {
68
        if ($message->getExecuteAt() && $message->getExecuteAt() > microtime(true)) {
69
            $this->redis->zAdd($this->scheduleKey, $message->getExecuteAt(), $this->serializer->serialize($message));
70 6
        } else {
71
            $key = $this->getKey($priority);
72 6
            $this->redis->sAdd($key, $this->serializer->serialize($message));
73 6
        }
74 6
        return true;
75
    }
76 6
77 6
    public function setupPriorityQueue(string $name, int $priority): void
78
    {
79 6
        $this->queues[$priority] = $name;
80 3
        ksort($this->queues, SORT_ASC | SORT_NUMERIC);
81 2
    }
82 6
83 3
    private function getKey(int $priority): string
84 2
    {
85
        if (!isset($this->queues[$priority])) {
86 6
            throw new \Exception("Unknown priority {$priority}");
87 6
        }
88
        return $this->queues[$priority];
89
    }
90 6
91 6
    /**s
92 4
     * {@inheritdoc}
93
     *
94 6
     * @throws ShutdownException
95
     */
96
    public function wait(Closure $callback, array $priorities = []): void
97 4
    {
98 6
        $queues = array_reverse($this->queues, true);
99
        while (true) {
100
            $this->checkShutdown();
101
            if (!$this->shouldProcessNext()) {
102
                break;
103
            }
104
105
            // check schedule
106
            $messagesString = $this->redis->zRangeByScore($this->scheduleKey, '-inf', (string)microtime(true), ['limit' => [0, 1]]);
107
            if (count($messagesString)) {
108
                foreach ($messagesString as $messageString) {
109
                    $this->redis->zRem($this->scheduleKey, $messageString);
110
                    $this->send($this->serializer->unserialize($messageString));
111
                }
112
            }
113
114
            $messageString = null;
115
            $foundPriority = null;
116
117
            foreach ($queues as $priority => $name) {
118
                if (count($priorities) > 0 && !in_array($priority, $priorities)) {
119
                    continue;
120
                }
121
                if ($messageString !== null) {
122
                    break;
123
                }
124
125
                $messageString = $this->pop($this->getKey($priority));
126
                $foundPriority = $priority;
127
            }
128
129
            if ($messageString !== null) {
130
                $message = $this->serializer->unserialize($messageString);
131
                $callback($message, $foundPriority);
132
                $this->incrementProcessedItems();
133
            } else {
134
                if ($this->refreshInterval) {
135
                    $this->checkShutdown();
136
                    sleep($this->refreshInterval);
137
                }
138
            }
139
        }
140
    }
141
142
    private function pop(string $key): ?string
143
    {
144
        $messageString = $this->redis->sPop($key);
145
        if (is_string($messageString) && $messageString !== "") {
146
            return $messageString;
147
        }
148
149
        return null;
150
    }
151
}
152