Completed
Push — master ( 2c2c88...660433 )
by Tomas
13:55
created

RedisSetDriver::wait()   D

Complexity

Conditions 15
Paths 218

Size

Total Lines 51

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 11
CRAP Score 15.8185

Importance

Changes 0
Metric Value
dl 0
loc 51
ccs 11
cts 13
cp 0.8462
rs 4.8583
c 0
b 0
f 0
cc 15
nc 218
nop 1
crap 15.8185

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
3
namespace Tomaj\Hermes\Driver;
4
5
use Exception;
6
use Tomaj\Hermes\MessageInterface;
7
use Closure;
8
use Tomaj\Hermes\MessageSerializer;
9
use InvalidArgumentException;
10
11
class RedisSetDriver implements DriverInterface
12
{
13
    use MaxItemsTrait;
14
    use SerializerAwareTrait;
15
16
    /**
17
     * @var string
18
     */
19
    private $key;
20
21
    /**
22
     * @var string
23
     */
24
    private $scheduleKey;
25
26
    /**
27
     * @var Redis|Predis\Client
28
     */
29
    private $redis;
30
31
    /**
32
     * @var integer
33
     */
34
    private $refreshInterval;
35
36
    /**
37
     * Create new RedisSetDriver
38
     *
39
     * This driver is using redis set. With send message it add new item to set
40
     * and in wait() command it is reading new items in this set.
41
     * This driver doesn't use redis pubsub functionality, only redis sets.
42
     *
43
     * Managing connection to redis is up to you and you have to create it outsite
44
     * of this class. You can use native Redis php extension or Predis extension.
45
     *
46
     * @see examples/redis
47 21
     *
48
     * @param Redis\Predis\Client    $redis
49 21
     * @param string                 $key
50 3
     * @param integer                $refreshInterval
51
     * @param string                 $scheduleKey
52
     */
53 18
    public function __construct($redis, string $key = 'hermes', int $refreshInterval = 1, string $scheduleKey = 'hermes_schedule')
54 18
    {
55 18
        if (!(($redis instanceof \Predis\Client) || ($redis instanceof \Redis))) {
56 18
            throw new InvalidArgumentException('Predis\Client or Redis instance required');
57 18
        }
58
59
        $this->key = $key;
60
        $this->scheduleKey = $scheduleKey;
61
        $this->redis = $redis;
62 6
        $this->refreshInterval = $refreshInterval;
63
        $this->serializer = new MessageSerializer();
64 6
    }
65 6
66
    /**
67
     * {@inheritdoc}
68
     */
69
    public function send(MessageInterface $message): bool
70 6
    {
71
        if ($message->getExecuteAt() && $message->getExecuteAt() > microtime(true)) {
72 6
            $this->redis->zadd($this->scheduleKey, $message->getExecuteAt(), $this->serializer->serialize($message));
73 6
        } else {
74 6
            $this->redis->sadd($this->key, $this->serializer->serialize($message));
75
        }
76 6
        return true;
77 6
    }
78
79 6
    /**
80 3
     * {@inheritdoc}
81 2
     */
82 6
    public function wait(Closure $callback): void
83 3
    {
84 2
        while (true) {
85
            if (!$this->shouldProcessNext()) {
86 6
                break;
87 6
            }
88
89
            // check schedule
90 6
            $messagesString = [];
91 6
            if ($this->redis instanceof \Predis\Client) {
92 4
                $messagesString = $this->redis->zrangebyscore($this->scheduleKey, '-inf', microtime(true), 'LIMIT', 0, 1);
93
                if (count($messagesString)) {
94 6
                    foreach ($messagesString as $messageString) {
95
                        $this->redis->zrem($this->scheduleKey, $messageString);
96
                    }
97 4
                }
98 6
            }
99
            if ($this->redis instanceof \Redis) {
100
                $messagesString = $this->redis->zRangeByScore($this->scheduleKey, '-inf', microtime(true), ['limit' => [0, 1]]);
101
                if (count($messagesString)) {
102
                    foreach ($messagesString as $messageString) {
103
                        $this->redis->zRem($this->scheduleKey, $messageString);
104
                    }
105
                }
106
            }
107
            if (count($messagesString)) {
108
                foreach ($messagesString as $messageString) {
109
                    $this->send($this->serializer->unserialize($messageString));
110
                }
111
            }
112
113
114
            $messageString = false;
115
116
            if ($this->redis instanceof \Predis\Client) {
117
                $messageString = $this->redis->spop($this->key);
118
            }
119
            if ($this->redis instanceof \Redis) {
120
                $messageString = $this->redis->sPop($this->key);
121
            }
122
123
            if (!$messageString) {
124
                if ($this->refreshInterval) {
125
                    sleep($this->refreshInterval);
126
                    continue;
127
                }
128
            }
129
            $callback($this->serializer->unserialize($messageString));
130
            $this->incrementProcessedItems();
131
        }
132
    }
133
}
134