Completed
Pull Request — master (#123)
by Roberto
27:02 queued 12:02
created

Streams::xRange()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 6
rs 10
c 0
b 0
f 0
cc 2
nc 2
nop 4
1
<?php
2
3
namespace Webdcg\Redis\Traits;
4
5
trait Streams
6
{
7
    /**
8
     * Acknowledge one or more pending messages.
9
     *
10
     * See: https://redis.io/commands/xack.
11
     *
12
     * @param  string $stream
13
     * @param  string $group
14
     * @param  array  $messages
15
     *
16
     * @return int              The number of messages Redis reports as acknowledged.
17
     */
18
    public function xAck(string $stream, string $group, array $messages): int
19
    {
20
        return $this->redis->xAck($stream, $group, $messages);
0 ignored issues
show
Bug introduced by
The property redis does not exist. Did you maybe forget to declare it?

In PHP it is possible to write to properties without declaring them. For example, the following is perfectly valid PHP code:

class MyClass { }

$x = new MyClass();
$x->foo = true;

Generally, it is a good practice to explictly declare properties to avoid accidental typos and provide IDE auto-completion:

class MyClass {
    public $foo;
}

$x = new MyClass();
$x->foo = true;
Loading history...
21
    }
22
23
24
    /**
25
     * Appends the specified stream entry to the stream at the specified key.
26
     * If the key does not exist, as a side effect of running this command the
27
     * key is created with a stream value.
28
     *
29
     * See: https://redis.io/commands/xadd.
30
     *
31
     * @param  string      $key
32
     * @param  string      $id
33
     * @param  array       $message
34
     * @param  int|integer $maxLenght
35
     * @param  bool        $approximate
36
     *
37
     * @return string                   The added message ID
38
     */
39
    public function xAdd(
40
        string $key,
41
        string $id,
42
        array $message,
43
        ?int $maxLenght = null,
44
        ?bool $approximate = null
45
    ): string {
46
        if (is_null($maxLenght) && is_null($approximate)) {
47
            return $this->redis->xAdd($key, $id, $message);
48
        }
49
50
        return is_null($approximate) ?
51
            $this->redis->xAdd($key, $id, $message, $maxLenght) :
52
            $this->redis->xAdd($key, $id, $message, $maxLenght, $approximate);
53
    }
54
55
56
    /**
57
     * Claim ownership of one or more pending messages.
58
     *
59
     * See: https://redis.io/commands/xclaim.
60
     *
61
     * @param  string     $stream
62
     * @param  string     $group
63
     * @param  string     $consumer
64
     * @param  int        $minIdleTime
65
     * @param  array      $messageIds
66
     * @param  array|null $options
67
     *
68
     * @return array                    Either an array of message IDs along with
69
     *                                  corresponding data, or just an array of
70
     *                                  IDs (if the 'JUSTID' option was passed).
71
     */
72
    public function xClaim(
73
        string $stream,
74
        string $group,
75
        string $consumer,
76
        int $minIdleTime,
77
        array $messageIds,
78
        ?array $options = null
79
    ): array {
80
        if (!is_null($options) && !$this->_checkClaimOptions($options)) {
81
            throw new \Exception("Bad Claim Options", 1);
82
        }
83
84
        return is_null($options) ?
85
            $this->redis->xClaim($stream, $group, $consumer, $minIdleTime, $messageIds) :
86
            $this->redis->xClaim($stream, $group, $consumer, $minIdleTime, $messageIds, $options);
87
    }
88
89
90
    /**
91
     * Delete one or more messages from a stream.
92
     *
93
     * See: https://redis.io/commands/xdel.
94
     *
95
     * @param  string $stream
96
     * @param  array  $messageIds
97
     *
98
     * @return int                  The number of messages removed.
99
     */
100
    public function xDel(string $stream, array $messageIds): int
101
    {
102
        return $this->redis->xDel($stream, $messageIds);
103
    }
104
105
106
    /**
107
     * This command is used in order to create, destroy, or manage consumer groups.
108
     *
109
     * See: https://redis.io/commands/xgroup.
110
     *
111
     * @param  string       $command                [description]
112
     * @param  string|null  $stream                 [description]
113
     * @param  string|null  $group                  [description]
114
     * @param  [type]       $messageId_consumerName [description]
0 ignored issues
show
Documentation introduced by
The doc-type [type] could not be parsed: Unknown type name "" at position 0. [(view supported doc-types)

This check marks PHPDoc comments that could not be parsed by our parser. To see which comment annotations we can parse, please refer to our documentation on supported doc-types.

Loading history...
115
     * @param  bool|boolean $makeStream             [description]
116
     *
117
     * @return mixed                                This command returns different
118
     *                                              types depending on the specific
119
     *                                              XGROUP command executed.
120
     */
121
    public function xGroup(
122
        string $command,
123
        ?string $stream = null,
124
        ?string $group = null,
125
        $messageId_consumerName = null,
126
        bool $makeStream = false
127
    ) {
128
        return $this->redis->xGroup($command, $stream, $group, $messageId_consumerName, $makeStream);
129
    }
130
131
132
    /**
133
     * Get information about a stream or consumer groups.
134
     *
135
     * See: https://redis.io/commands/xinfo.
136
     *
137
     * @param  string $command
138
     * @param  string $stream
139
     * @param  string $group
140
     *
141
     * @return mixed            This command returns different types depending on which subcommand is used.
142
     */
143
    public function xInfo(string $command, ?string $stream = null, ?string $group = null)
144
    {
145
        $command = strtoupper($command);
146
147
        if (!$this->_checkInfoCommands($command)) {
148
            throw new \Exception("Bad Info Command", 1);
149
        }
150
151
        if (is_null($stream) && is_null($group)) {
152
            return $this->redis->xInfo($command);
153
        }
154
155
        return is_null($group) ?
156
            $this->redis->xInfo($command, $stream) :
157
            $this->redis->xInfo($command, $stream, $group);
158
    }
159
160
161
    /**
162
     * Get the length of a given stream.
163
     *
164
     * See: https://redis.io/commands/xlen.
165
     *
166
     * @param  string $stream
167
     *
168
     * @return The number of messages in the stream.
169
     */
170
    public function xLen(string $stream): int
171
    {
172
        return $this->redis->xLen($stream);
173
    }
174
175
176
    /**
177
     * Get information about pending messages in a given stream.
178
     *
179
     * See: https://redis.io/commands/xpending.
180
     *
181
     * @param  string      $stream
182
     * @param  string      $group
183
     * @param  string|null $start
184
     * @param  string|null $end
185
     * @param  int|null    $count
186
     * @param  string|null $consumer
187
     *
188
     * @return array                    Information about the pending messages,
189
     *                                  in various forms depending on the specific
190
     *                                  invocation of XPENDING.
191
     */
192
    public function xPending(
193
        string $stream,
194
        string $group,
195
        ?string $start = null,
196
        ?string $end = null,
197
        ?int $count = null,
198
        ?string $consumer = null
199
    ): array {
200
        return is_null($start) || is_null($end) || is_null($count) || is_null($consumer) ?
201
            $this->redis->xPending($stream, $group) :
202
            $this->redis->xPending($stream, $group, $start, $end, $count, $consumer);
203
    }
204
205
206
    /**
207
     * Get a range of messages from a given stream.
208
     *
209
     * See: https://redis.io/commands/xrange.
210
     *
211
     * @param  string   $stream
212
     * @param  string   $start
213
     * @param  string   $end
214
     * @param  int|null $count
215
     *
216
     * @return array            The messages in the stream within the requested range.
217
     */
218
    public function xRange(string $stream, string $start, string $end, ?int $count = null): array
219
    {
220
        return is_null($count) ?
221
            $this->redis->xRange($stream, $start, $end) :
222
            $this->redis->xRange($stream, $start, $end, $count);
223
    }
224
225
226
    /**
227
     * Read data from one or more streams and only return IDs greater than sent in the command.
228
     *
229
     * See: https://redis.io/commands/xread.
230
     *
231
     * @param  array    $streams
232
     * @param  int|null $count
233
     * @param  int|null $block
234
     *
235
     * @return array            The messages in the stream newer than the IDs passed to Redis (if any).
236
     */
237
    public function xRead(array $streams, ?int $count = null, ?int $block = null): array
238
    {
239
        if (!is_null($count) && !is_null($block)) {
240
            return $this->redis->xRead($streams, $count, $block);
241
        }
242
243
        return is_null($count) ? $this->redis->xRead($streams) : $this->redis->xRead($streams, $count);
244
    }
245
246
247
    public function xReadGroup(): array
248
    {
249
        return [];
250
    }
251
252
    public function xRevRange(): bool
253
    {
254
        return false;
255
    }
256
257
    public function xTrim(): bool
258
    {
259
        return false;
260
    }
261
262
263
    /**
264
     * ************************************************************************
265
     * H E L P E R    F U N C T I O N S
266
     * ************************************************************************
267
     */
268
269
270
    /**
271
     * Claim Options available
272
     *
273
     * Note:  'TIME', and 'IDLE' are mutually exclusive
274
     *
275
     * 'IDLE' => $value, Set the idle time to $value ms
276
     * 'TIME' => $value, Set the idle time to now - $value
277
     * 'RETRYCOUNT' => $value, Update message retrycount to $value
278
     * 'FORCE', Claim the message(s) even if they're not pending anywhere
279
     * 'JUSTID',Instruct Redis to only return IDs
280
     *
281
     * @param  array  $options
282
     *
283
     * @return bool
284
     */
285
    private function _checkClaimOptions(array $options): bool
286
    {
287
        $available = ['IDLE', 'TIME', 'RETRYCOUNT', 'FORCE', 'JUSTID'];
288
289
        foreach ($options as $key => $value) {
290
            $check = is_numeric($key) ? $value : $key;
291
            if (!in_array($check, $available)) {
292
                return false;
293
            }
294
        }
295
296
        return !(array_key_exists('IDLE', $options) && array_key_exists('TIME', $options));
297
    }
298
299
300
    /**
301
     * [_checkInfoCommands description]
302
     *
303
     * @param  string $command
304
     *
305
     * @return bool
306
     */
307
    private function _checkInfoCommands(string $command): bool
308
    {
309
        $available = ['CONSUMERS', 'GROUPS', 'STREAM', 'HELP'];
310
311
        return in_array($command, $available, true);
312
    }
313
}
314