Passed
Push — master ( ddd378...292f19 )
by Sergey
03:44 queued 01:24
created

Stream::get()   A

Complexity

Conditions 3
Paths 6

Size

Total Lines 26
Code Lines 14

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 11
CRAP Score 3.0327

Importance

Changes 5
Bugs 1 Features 2
Metric Value
cc 3
eloc 14
c 5
b 1
f 2
nc 6
nop 0
dl 0
loc 26
ccs 11
cts 13
cp 0.8462
crap 3.0327
rs 9.7998
1
<?php
2
3
declare(strict_types=1);
4
5
/**
6
 * Stream class
7
 *
8
 * @author Sergei Karii <[email protected]>
9
 */
10
11
namespace Asiries335\redisSteamPhp;
12
13
use Asiries335\redisSteamPhp\Data\Collection;
14
use Asiries335\redisSteamPhp\Data\Constants;
15
use Asiries335\redisSteamPhp\Data\Message;
16
use Asiries335\redisSteamPhp\Dto\StreamCommandCallTransporter;
17
use Asiries335\redisSteamPhp\Hydrator\CollectionHydrator;
18
use Asiries335\redisSteamPhp\Hydrator\MessageHydrator;
19
20
21
final class Stream
22
{
23
    /**
24
     * Client
25
     *
26
     * @var ClientRedisStreamPhpInterface
27
     */
28
    private $_client;
29
30
    /**
31
     * Name stream
32
     *
33
     * @var string
34
     */
35
    private $_streamName;
36
37
    /**
38
     * Stream constructor.
39
     *
40
     * @param ClientRedisStreamPhpInterface $client     ClientRedisInterface
41
     * @param string                        $nameStream Name stream
42
     */
43 4
    public function __construct(ClientRedisStreamPhpInterface $client, string $nameStream)
44
    {
45 4
        $this->_client     = $client;
46 4
        $this->_streamName = $nameStream;
47 4
    }
48
49
    /**
50
     * Appends the specified stream entry to the stream at the specified key
51
     *
52
     * @param string $key    Key Message
53
     * @param array  $values Value Message
54
     *
55
     * @return string
56
     *
57
     * @throws \Exception
58
     *
59
     * @see https://redis.io/commands/xadd
60
     */
61 2
    public function add(string $key, array $values) : string
62
    {
63 2
        $transporter = new StreamCommandCallTransporter(
64
            [
65 2
                'command' => Constants::COMMAND_XADD,
66
                'args'    => [
67 2
                    $this->_streamName,
68 2
                    '*',
69 2
                    $key,
70 2
                    json_encode($values)
71
                ]
72
            ]
73
        );
74
75
        try {
76 2
            return (string) $this->_client->call($transporter);
77
        } catch (\Exception $exception) {
78
            throw $exception;
79
        }
80
    }
81
82
    /**
83
     * Find a message by id
84
     *
85
     * @param string $id Id Message
86
     *
87
     * @return Message
88
     *
89
     * @throws \Exception
90
     */
91
    public function findById(string $id) : Message
92
    {
93
        $transporter = new StreamCommandCallTransporter(
94
            [
95
                'command' => Constants::COMMAND_XREAD,
96
                'args'    => [
97
                    'STREAMS',
98
                    $this->_streamName,
99
                    $id
100
                ]
101
            ]
102
        );
103
104
        $item = $this->_client->call($transporter);
105
106
        if (empty($item) === true) {
107
            return new Message;
108
        }
109
110
        $message = new MessageHydrator();
111
112
        return $message->hydrate($item[0][1][0], Message::class);
113
    }
114
115
    /**
116
     * Removes the messages entries from a stream
117
     *
118
     * @param string $key Key Message
119
     *
120
     * @return int
121
     *
122
     * @throws \Exception
123
     *
124
     * @see https://redis.io/commands/xdel
125
     */
126 1
    public function delete(string $key) : int
127
    {
128 1
        $transporter = new StreamCommandCallTransporter(
129
            [
130 1
                'command' => Constants::COMMAND_XDEL,
131
                'args'    => [
132 1
                    $this->_streamName,
133 1
                    $key
134
                ]
135
            ]
136
        );
137
138
        try {
139 1
            return (int) $this->_client->call($transporter);
140
        } catch (\Exception $exception) {
141
            throw $exception;
142
        }
143
    }
144
145
    /**
146
     * Get data from stream
147
     *
148
     * @return Collection
149
     *
150
     * @throws \Exception
151
     *
152
     * @see https://redis.io/commands/xread
153
     */
154 2
    public function get() : Collection
155
    {
156 2
        $transporter = new StreamCommandCallTransporter(
157
            [
158 2
                'command' => Constants::COMMAND_XREAD,
159
                'args'    => [
160 2
                    'STREAMS',
161 2
                    $this->_streamName,
162 2
                    '0'
163
                ]
164
            ]
165
        );
166
167
        try {
168 2
            $items = $this->_client->call($transporter);
169
170 2
            $collection = new CollectionHydrator();
171
172 2
            if (empty($items) === true) {
173 1
                return new Collection;
174
            }
175
176 1
            return $collection->hydrate($items, Collection::class);
177
178
        } catch (\Exception $exception) {
179
            throw $exception;
180
        }
181
    }
182
183
    /**
184
     * Listen stream
185
     *
186
     * @param \Closure $closure User callback
187
     *
188
     * @return void
189
     *
190
     */
191
    public function listen(\Closure $closure) : void
192
    {
193
        $messageHydrate = new MessageHydrator();
194
195
        $loop = \React\EventLoop\Factory::create();
196
197
        $loop->addPeriodicTimer(
198
            Constants::TIME_TICK_INTERVAL,
199
            function () use ($closure, $messageHydrate, $loop) {
0 ignored issues
show
Unused Code introduced by
The import $loop is not used and could be removed.

This check looks for imports that have been defined, but are not used in the scope.

Loading history...
200
                $transporter = new StreamCommandCallTransporter(
201
                    [
202
                        'command' => Constants::COMMAND_XRANGE,
203
                        'args'    => [
204
                            $this->_streamName,
205
                            '-',
206
                            '+'
207
                        ]
208
                    ]
209
                );
210
211
                $rows = $this->_client->call($transporter);
212
213
                if (empty($rows) === true) {
214
                    return;
215
                }
216
217
                foreach ($rows as $row) {
218
                    $message = $messageHydrate->hydrate($row, Message::class);
219
                    $closure->call($this, $message);
220
                    $this->delete($message->getId());
221
                }
222
            }
223
        );
224
225
        $loop->run();
226
    }
227
228
}