Passed
Push — master ( 0517dd...df09ce )
by Sergey
01:20 queued 10s
created

Stream::findById()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 22
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 9
CRAP Score 2.024

Importance

Changes 2
Bugs 0 Features 1
Metric Value
cc 2
eloc 11
c 2
b 0
f 1
nc 2
nop 1
dl 0
loc 22
ccs 9
cts 11
cp 0.8182
crap 2.024
rs 9.9
1
<?php
2
3
declare(strict_types=1);
4
5
/**
6
 * Stream class
7
 *
8
 * @author Sergei Karii <[email protected]>
9
 */
10
11
namespace Asiries335\redisSteamPhp;
12
13
use Asiries335\redisSteamPhp\Data\Collection;
14
use Asiries335\redisSteamPhp\Data\Constants;
15
use Asiries335\redisSteamPhp\Data\Message;
16
use Asiries335\redisSteamPhp\Dto\StreamCommandCallTransporter;
17
use Asiries335\redisSteamPhp\Hydrator\CollectionHydrator;
18
use Asiries335\redisSteamPhp\Hydrator\MessageHydrator;
19
20
21
final class Stream extends RedisStream
22
{
23
    /**
24
     * Appends the specified stream entry to the stream at the specified key
25
     *
26
     * @param string $key    Key Message
27
     * @param array  $values Value Message
28
     *
29
     * @return string
30
     *
31
     * @throws \Exception
32
     *
33
     * @see https://redis.io/commands/xadd
34
     */
35 2
    public function add(string $key, array $values) : string
36
    {
37 2
        $transporter = new StreamCommandCallTransporter(
38
            [
39 2
                'command' => Constants::COMMAND_XADD,
40
                'args'    => [
41 2
                    $this->_streamName,
42 2
                    '*',
43 2
                    $key,
44 2
                    json_encode($values)
45
                ]
46
            ]
47
        );
48
49
        try {
50 2
            return (string) $this->_client->call($transporter);
51
        } catch (\Exception $exception) {
52
            throw $exception;
53
        }
54
    }
55
56
    /**
57
     * Find a message by id
58
     *
59
     * @param string $id Id Message
60
     *
61
     * @return Message
62
     *
63
     * @throws \Exception
64
     */
65 1
    public function findById(string $id) : Message
66
    {
67 1
        $transporter = new StreamCommandCallTransporter(
68
            [
69 1
                'command' => Constants::COMMAND_XREAD,
70
                'args'    => [
71 1
                    'STREAMS',
72 1
                    $this->_streamName,
73 1
                    $id
74
                ]
75
            ]
76
        );
77
78 1
        $item = $this->_client->call($transporter);
79
80 1
        if (empty($item) === true) {
81 1
            return new Message;
82
        }
83
84
        $message = new MessageHydrator();
85
86
        return $message->hydrate($item[0][1][0] ?? [], Message::class);
87
    }
88
89
    /**
90
     * Removes the messages entries from a stream
91
     *
92
     * @param string $key Key Message
93
     *
94
     * @return int
95
     *
96
     * @throws \Exception
97
     *
98
     * @see https://redis.io/commands/xdel
99
     */
100 1
    public function delete(string $key) : int
101
    {
102 1
        $transporter = new StreamCommandCallTransporter(
103
            [
104 1
                'command' => Constants::COMMAND_XDEL,
105
                'args'    => [
106 1
                    $this->_streamName,
107 1
                    $key
108
                ]
109
            ]
110
        );
111
112
        try {
113 1
            return (int) $this->_client->call($transporter);
114
        } catch (\Exception $exception) {
115
            throw $exception;
116
        }
117
    }
118
119
    /**
120
     * Get data from stream
121
     *
122
     * @return Collection
123
     *
124
     * @throws \Exception
125
     *
126
     * @see https://redis.io/commands/xread
127
     */
128 2
    public function get() : Collection
129
    {
130 2
        $transporter = new StreamCommandCallTransporter(
131
            [
132 2
                'command' => Constants::COMMAND_XREAD,
133
                'args'    => [
134 2
                    'STREAMS',
135 2
                    $this->_streamName,
136 2
                    '0'
137
                ]
138
            ]
139
        );
140
141
        try {
142 2
            $items = $this->_client->call($transporter);
143
144 2
            $collection = new CollectionHydrator();
145
146 2
            if (empty($items) === true) {
147 1
                return new Collection;
148
            }
149
150 1
            return $collection->hydrate($items, Collection::class);
151
152
        } catch (\Exception $exception) {
153
            throw $exception;
154
        }
155
    }
156
157
    /**
158
     * Listen stream
159
     *
160
     * @param \Closure $closure User callback
161
     *
162
     * @return void
163
     *
164
     */
165
    public function listen(\Closure $closure) : void
166
    {
167
        $messageHydrate = new MessageHydrator();
168
169
        $loop = \React\EventLoop\Factory::create();
170
171
        $loop->addPeriodicTimer(
172
            Constants::TIME_TICK_INTERVAL,
173
            function () use ($closure, $messageHydrate, $loop) {
0 ignored issues
show
Unused Code introduced by
The import $loop is not used and could be removed.

This check looks for imports that have been defined, but are not used in the scope.

Loading history...
174
                $transporter = new StreamCommandCallTransporter(
175
                    [
176
                        'command' => Constants::COMMAND_XRANGE,
177
                        'args'    => [
178
                            $this->_streamName,
179
                            '-',
180
                            '+'
181
                        ]
182
                    ]
183
                );
184
185
                $rows = $this->_client->call($transporter);
186
187
                if (empty($rows) === true) {
188
                    return;
189
                }
190
191
                foreach ($rows as $row) {
192
                    $message = $messageHydrate->hydrate($row, Message::class);
193
                    $closure->call($this, $message);
194
                    $this->delete($message->getId());
195
                }
196
            }
197
        );
198
199
        $loop->run();
200
    }
201
202
}