Completed
Push — master ( a4a48b...00b680 )
by Tomas
26:02 queued 10:53
created

RedisSetDriver::getKey()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 2.032

Importance

Changes 0
Metric Value
dl 0
loc 7
ccs 4
cts 5
cp 0.8
rs 10
c 0
b 0
f 0
cc 2
nc 2
nop 1
crap 2.032
1
<?php
2
declare(strict_types=1);
3
4
namespace Tomaj\Hermes\Driver;
5
6
use Closure;
7
use InvalidArgumentException;
8
use Predis\Client;
9
use Redis;
10
use Tomaj\Hermes\Dispatcher;
11
use Tomaj\Hermes\MessageInterface;
12
use Tomaj\Hermes\MessageSerializer;
13
use Tomaj\Hermes\Restart\RestartException;
14
15
class RedisSetDriver implements DriverInterface
16
{
17
    use MaxItemsTrait;
18
    use RestartTrait;
19
    use SerializerAwareTrait;
20
21
    private $queues = [];
22
23
    /**
24
     * @var string
25
     */
26
    private $scheduleKey;
27
28
    /**
29
     * @var Redis|Client
30
     */
31
    private $redis;
32
33
    /**
34
     * @var integer
35
     */
36
    private $refreshInterval;
37
38
    /**
39
     * Create new RedisSetDriver
40
     *
41
     * This driver is using redis set. With send message it add new item to set
42
     * and in wait() command it is reading new items in this set.
43
     * This driver doesn't use redis pubsub functionality, only redis sets.
44
     *
45
     * Managing connection to redis is up to you and you have to create it outsite
46
     * of this class. You can use native Redis php extension or Predis extension.
47 21
     *
48
     * @see examples/redis
49 21
     *
50 3
     * @param Redis|Client    $redis
51
     * @param string                 $key
52
     * @param integer                $refreshInterval
53 18
     * @param string                 $scheduleKey
54 18
     */
55 18
    public function __construct($redis, string $key = 'hermes', int $refreshInterval = 1, string $scheduleKey = 'hermes_schedule')
56 18
    {
57 18
        if (!(($redis instanceof Client) || ($redis instanceof Redis))) {
0 ignored issues
show
Bug introduced by
The class Predis\Client does not exist. Did you forget a USE statement, or did you not list all dependencies?

This error could be the result of:

1. Missing dependencies

PHP Analyzer uses your composer.json file (if available) to determine the dependencies of your project and to determine all the available classes and functions. It expects the composer.json to be in the root folder of your repository.

Are you sure this class is defined by one of your dependencies, or did you maybe not list a dependency in either the require or require-dev section?

2. Missing use statement

PHP does not complain about undefined classes in ìnstanceof checks. For example, the following PHP code will work perfectly fine:

if ($x instanceof DoesNotExist) {
    // Do something.
}

If you have not tested against this specific condition, such errors might go unnoticed.

Loading history...
58
            throw new InvalidArgumentException('Predis\Client or Redis instance required');
59
        }
60
61
        $this->setupPriorityQueue($key, Dispatcher::PRIORITY_MEDIUM);
62 6
63
        $this->scheduleKey = $scheduleKey;
64 6
        $this->redis = $redis;
65 6
        $this->refreshInterval = $refreshInterval;
66
        $this->serializer = new MessageSerializer();
67
    }
68
69
    /**
70 6
     * {@inheritdoc}
71
     */
72 6
    public function send(MessageInterface $message, int $priority = Dispatcher::PRIORITY_MEDIUM): bool
73 6
    {
74 6
        if ($message->getExecuteAt() && $message->getExecuteAt() > microtime(true)) {
75
            $this->redis->zadd($this->scheduleKey, [$message->getExecuteAt(), $this->serializer->serialize($message)]);
76 6
        } else {
77 6
            $key = $this->getKey($priority);
78
            $this->redis->sadd($key, $this->serializer->serialize($message));
79 6
        }
80 3
        return true;
81 2
    }
82 6
83 3
    public function setupPriorityQueue(string $name, int $priority): void
84 2
    {
85
        $this->queues[$priority] = $name;
86 6
        ksort($this->queues, SORT_ASC | SORT_NUMERIC);
87 6
    }
88
89
    private function getKey(int $priority): string
90 6
    {
91 6
        if (!isset($this->queues[$priority])) {
92 4
            throw new \Exception("Unknown priority {$priority}");
93
        }
94 6
        return $this->queues[$priority];
95
    }
96
97 4
    /**s
98 6
     * {@inheritdoc}
99
     *
100
     * @throws RestartException
101
     */
102
    public function wait(Closure $callback, array $priorities = []): void
103
    {
104
        $queues = array_reverse($this->queues, true);
105
        while (true) {
106
            $this->checkRestart();
107
            if (!$this->shouldProcessNext()) {
108
                break;
109
            }
110
111
            // check schedule
112
            $messagesString = [];
113 View Code Duplication
            if ($this->redis instanceof Client) {
0 ignored issues
show
Bug introduced by
The class Predis\Client does not exist. Did you forget a USE statement, or did you not list all dependencies?

This error could be the result of:

1. Missing dependencies

PHP Analyzer uses your composer.json file (if available) to determine the dependencies of your project and to determine all the available classes and functions. It expects the composer.json to be in the root folder of your repository.

Are you sure this class is defined by one of your dependencies, or did you maybe not list a dependency in either the require or require-dev section?

2. Missing use statement

PHP does not complain about undefined classes in ìnstanceof checks. For example, the following PHP code will work perfectly fine:

if ($x instanceof DoesNotExist) {
    // Do something.
}

If you have not tested against this specific condition, such errors might go unnoticed.

Loading history...
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
114
                $messagesString = $this->redis->zrangebyscore($this->scheduleKey, '-inf', microtime(true), ['LIMIT' => ['OFFSET' => 0, 'COUNT' => 1]]);
115
                if (count($messagesString)) {
116
                    foreach ($messagesString as $messageString) {
117
                        $this->redis->zrem($this->scheduleKey, $messageString);
118
                    }
119
                }
120
            }
121 View Code Duplication
            if ($this->redis instanceof Redis) {
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated across your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
122
                $messagesString = $this->redis->zRangeByScore($this->scheduleKey, '-inf', (string)microtime(true), ['limit' => [0, 1]]);
123
                if (count($messagesString)) {
124
                    foreach ($messagesString as $messageString) {
125
                        $this->redis->zRem($this->scheduleKey, $messageString);
126
                    }
127
                }
128
            }
129
            if (count($messagesString)) {
130
                foreach ($messagesString as $messageString) {
131
                    $this->send($this->serializer->unserialize($messageString));
132
                }
133
            }
134
135
            $messageString = false;
136
            $foundPriority = null;
137
138
            foreach ($queues as $priority => $name) {
139
                if (count($priorities) > 0 && !in_array($priority, $priorities)) {
140
                    continue;
141
                }
142
                if ($messageString) {
143
                    break;
144
                }
145
146
                $key = $this->getKey($priority);
147
148
                if ($this->redis instanceof Client) {
0 ignored issues
show
Bug introduced by
The class Predis\Client does not exist. Did you forget a USE statement, or did you not list all dependencies?

This error could be the result of:

1. Missing dependencies

PHP Analyzer uses your composer.json file (if available) to determine the dependencies of your project and to determine all the available classes and functions. It expects the composer.json to be in the root folder of your repository.

Are you sure this class is defined by one of your dependencies, or did you maybe not list a dependency in either the require or require-dev section?

2. Missing use statement

PHP does not complain about undefined classes in ìnstanceof checks. For example, the following PHP code will work perfectly fine:

if ($x instanceof DoesNotExist) {
    // Do something.
}

If you have not tested against this specific condition, such errors might go unnoticed.

Loading history...
149
                    $messageString = $this->redis->spop($key);
150
                    $foundPriority = $priority;
151
                }
152
                if ($this->redis instanceof Redis) {
153
                    $messageString = $this->redis->sPop($key);
154
                    $foundPriority = $priority;
155
                }
156
            }
157
158
            if ($messageString) {
159
                $message = $this->serializer->unserialize($messageString);
160
                $callback($message, $foundPriority);
161
                $this->incrementProcessedItems();
162
            } else {
163
                if ($this->refreshInterval) {
164
                    $this->checkRestart();
165
                    sleep($this->refreshInterval);
166
                }
167
            }
168
        }
169
    }
170
}
171