Passed
Push — master ( 3573d1...0e5a44 )
by Sergey
03:45 queued 01:50
created

Stream::delete()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 10
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 4
CRAP Score 2.1481

Importance

Changes 0
Metric Value
cc 2
eloc 7
nc 2
nop 1
dl 0
loc 10
ccs 4
cts 6
cp 0.6667
crap 2.1481
rs 10
c 0
b 0
f 0
1
<?php
2
3
declare(strict_types=1);
4
5
/**
6
 * Stream class
7
 *
8
 * @author Sergei Karii <[email protected]>
9
 */
10
11
namespace Asiries335\redisSteamPhp;
12
13
use Asiries335\redisSteamPhp\Data\Collection;
14
use Asiries335\redisSteamPhp\Data\Constants;
15
use Asiries335\redisSteamPhp\Data\Message;
16
use Asiries335\redisSteamPhp\Hydrator\CollectionHydrator;
17
use Asiries335\redisSteamPhp\Hydrator\MessageHydrator;
18
19
20
final class Stream
21
{
22
    /**
23
     * Client
24
     *
25
     * @var ClientRedisStreamPhpInterface
26
     */
27
    private $_client;
28
29
    /**
30
     * Name stream
31
     *
32
     * @var string
33
     */
34
    private $_streamName;
35
36
    /**
37
     * Stream constructor.
38
     *
39
     * @param ClientRedisStreamPhpInterface $client     ClientRedisInterface
40
     * @param string                        $nameStream Name stream
41
     */
42 4
    public function __construct(ClientRedisStreamPhpInterface $client, string $nameStream)
43
    {
44 4
        $this->_client     = $client;
45 4
        $this->_streamName = $nameStream;
46 4
    }
47
48
    /**
49
     * Appends the specified stream entry to the stream at the specified key
50
     *
51
     * @param string $key    Key Message
52
     * @param array  $values Value Message
53
     *
54
     * @return string
55
     *
56
     * @throws \Exception
57
     *
58
     * @see https://redis.io/commands/xadd
59
     */
60 2
    public function add(string $key, array $values) : string
61
    {
62
        try {
63 2
            return (string) $this->_client->call(
64 2
                Constants::COMMAND_XADD,
65 2
                $this->_streamName,
66 2
                '*',
67
                $key,
68 2
                json_encode($values)
69
            );
70
        } catch (\Exception $exception) {
71
            throw $exception;
72
        }
73
    }
74
75
    /**
76
     * Find a message by id
77
     *
78
     * @param string $id  Id Message
79
     *
80
     * @return Message
81
     *
82
     * @throws \Exception
83
     *
84
     */
85
    public function findById(string $id) : Message
86
    {
87
        $item = $this->_client->call(
88
            Constants::COMMAND_XREAD,
89
            'STREAMS',
90
            $this->_streamName,
91
            $id
92
        );
93
94
        if (empty($item) === true) {
95
            return new Message;
96
        }
97
98
        $message = new MessageHydrator();
99
100
        return $message->hydrate($item[0][1][0], Message::class);
101
    }
102
103
    /**
104
     * Removes the messages entries from a stream
105
     *
106
     * @param string $key Key Message
107
     *
108
     * @return int
109
     *
110
     * @throws \Exception
111
     *
112
     * @see https://redis.io/commands/xdel
113
     */
114 1
    public function delete(string $key) : int
115
    {
116
        try {
117 1
            return (int) $this->_client->call(
118 1
                Constants::COMMAND_XDEL,
119 1
                $this->_streamName,
120
                $key
121
            );
122
        } catch (\Exception $exception) {
123
            throw $exception;
124
        }
125
    }
126
127
    /**
128
     * Get data from stream
129
     *
130
     * @return Collection
131
     *
132
     * @throws \Exception
133
     *
134
     * @see https://redis.io/commands/xread
135
     */
136 2
    public function get() : Collection
137
    {
138
        try {
139 2
            $items = $this->_client->call(
140 2
                Constants::COMMAND_XREAD,
141 2
                'STREAMS',
142 2
                $this->_streamName,
143 2
                '0'
144
            );
145
146 2
            $collection = new CollectionHydrator();
147
148 2
            if (empty($items) === true) {
149 1
                return new Collection;
150
            }
151
152 1
            return $collection->hydrate($items, Collection::class);
153
154
        } catch (\Exception $exception) {
155
            throw $exception;
156
        }
157
    }
158
159
    /**
160
     * Listen stream
161
     *
162
     * @param \Closure $closure User callback
163
     *
164
     * @return void
165
     *
166
     * @throws \ErrorException
167
     */
168
    public function listen(\Closure $closure) : void
169
    {
170
        $messageHydrate = new MessageHydrator();
171
172
        $loop = \React\EventLoop\Factory::create();
173
174
        $loop->addPeriodicTimer(
175
            Constants::TIME_TICK_INTERVAL,
176
            function () use ($closure, $messageHydrate, $loop) {
0 ignored issues
show
Unused Code introduced by
The import $loop is not used and could be removed.

This check looks for imports that have been defined, but are not used in the scope.

Loading history...
177
                $rows = $this->_client->call(
178
                    Constants::COMMAND_XRANGE,
179
                    $this->_streamName,
180
                    '-',
181
                    '+'
182
                );
183
184
                if (empty($rows) === true) {
185
                    return;
186
                }
187
188
                foreach ($rows as $row) {
189
                    $message = $messageHydrate->hydrate($row, Message::class);
190
                    $closure->call($this, $message);
191
                    $this->delete($message->getId());
192
                }
193
            }
194
        );
195
196
        $loop->run();
197
    }
198
199
}