Completed
Pull Request — master (#123)
by Roberto
15:05
created

Streams::xRevRange()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 4
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 0
1
<?php
2
3
namespace Webdcg\Redis\Traits;
4
5
trait Streams
6
{
7
    /**
8
     * Acknowledge one or more pending messages.
9
     *
10
     * See: https://redis.io/commands/xack.
11
     *
12
     * @param  string $stream
13
     * @param  string $group
14
     * @param  array  $messages
15
     *
16
     * @return int              The number of messages Redis reports as acknowledged.
17
     */
18
    public function xAck(string $stream, string $group, array $messages): int
19
    {
20
        return $this->redis->xAck($stream, $group, $messages);
0 ignored issues
show
Bug introduced by
The property redis does not exist. Did you maybe forget to declare it?

In PHP it is possible to write to properties without declaring them. For example, the following is perfectly valid PHP code:

class MyClass { }

$x = new MyClass();
$x->foo = true;

Generally, it is a good practice to explictly declare properties to avoid accidental typos and provide IDE auto-completion:

class MyClass {
    public $foo;
}

$x = new MyClass();
$x->foo = true;
Loading history...
21
    }
22
23
24
    /**
25
     * Appends the specified stream entry to the stream at the specified key.
26
     * If the key does not exist, as a side effect of running this command the
27
     * key is created with a stream value.
28
     *
29
     * See: https://redis.io/commands/xadd.
30
     *
31
     * @param  string      $key
32
     * @param  string      $id
33
     * @param  array       $message
34
     * @param  int|integer $maxLenght
35
     * @param  bool        $approximate
36
     *
37
     * @return string                   The added message ID
38
     */
39
    public function xAdd(
40
        string $key,
41
        string $id,
42
        array $message,
43
        ?int $maxLenght = null,
44
        ?bool $approximate = null
45
    ): string {
46
        if (is_null($maxLenght) && is_null($approximate)) {
47
            return $this->redis->xAdd($key, $id, $message);
48
        }
49
50
        return is_null($approximate) ?
51
            $this->redis->xAdd($key, $id, $message, $maxLenght) :
52
            $this->redis->xAdd($key, $id, $message, $maxLenght, $approximate);
53
    }
54
55
56
    /**
57
     * Claim ownership of one or more pending messages.
58
     *
59
     * See: https://redis.io/commands/xclaim.
60
     *
61
     * @param  string     $stream
62
     * @param  string     $group
63
     * @param  string     $consumer
64
     * @param  int        $minIdleTime
65
     * @param  array      $messageIds
66
     * @param  array|null $options
67
     *
68
     * @return array                    Either an array of message IDs along with
69
     *                                  corresponding data, or just an array of
70
     *                                  IDs (if the 'JUSTID' option was passed).
71
     */
72
    public function xClaim(
73
        string $stream,
74
        string $group,
75
        string $consumer,
76
        int $minIdleTime,
77
        array $messageIds,
78
        ?array $options = null
79
    ): array {
80
        if (!is_null($options) && !$this->_checkClaimOptions($options)) {
81
            throw new \Exception("Bad Claim Options", 1);
82
        }
83
84
        return is_null($options) ?
85
            $this->redis->xClaim($stream, $group, $consumer, $minIdleTime, $messageIds) :
86
            $this->redis->xClaim($stream, $group, $consumer, $minIdleTime, $messageIds, $options);
87
    }
88
89
90
    /**
91
     * Delete one or more messages from a stream.
92
     *
93
     * See: https://redis.io/commands/xdel.
94
     *
95
     * @param  string $stream
96
     * @param  array  $messageIds
97
     *
98
     * @return int                  The number of messages removed.
99
     */
100
    public function xDel(string $stream, array $messageIds): int
101
    {
102
        return $this->redis->xDel($stream, $messageIds);
103
    }
104
105
106
    /**
107
     * This command is used in order to create, destroy, or manage consumer groups.
108
     *
109
     * See: https://redis.io/commands/xgroup.
110
     *
111
     * @param  string       $command
112
     * @param  string|null  $stream
113
     * @param  string|null  $group
114
     * @param  string|null  $messageId_consumerName
115
     * @param  bool|boolean $makeStream
116
     *
117
     * @return mixed                                This command returns different
118
     *                                              types depending on the specific
119
     *                                              XGROUP command executed.
120
     */
121
    public function xGroup(
122
        string $command,
123
        ?string $stream = null,
124
        ?string $group = null,
125
        ?string $messageId_consumerName = null,
126
        bool $makeStream = false
127
    ) {
128
        $command = strtoupper($command);
129
130
        if (!$this->_checkGroupCommands($command)) {
131
            throw new \Exception("Bad Group Command", 1);
132
        }
133
134
        switch ($command) {
135
            case 'HELP':
136
                return $this->redis->xGroup($command);
137
            case 'CREATE':
138
                return $this->redis->xGroup($command, $stream, $group, $messageId_consumerName, $makeStream);
139
            case 'SETID':
140
                return $this->redis->xGroup($command, $stream, $group, $messageId_consumerName);
141
            case 'DESTROY':
142
                return $this->redis->xGroup($command, $stream, $group);
143
            case 'DELCONSUMER':
144
                return $this->redis->xGroup($command, $stream, $group, $messageId_consumerName);
145
        }
146
    }
147
148
149
    /**
150
     * Get information about a stream or consumer groups.
151
     *
152
     * See: https://redis.io/commands/xinfo.
153
     *
154
     * @param  string $command
155
     * @param  string $stream
156
     * @param  string $group
157
     *
158
     * @return mixed            This command returns different types depending on which subcommand is used.
159
     */
160
    public function xInfo(string $command, ?string $stream = null, ?string $group = null)
161
    {
162
        $command = strtoupper($command);
163
164
        if (!$this->_checkInfoCommands($command)) {
165
            throw new \Exception("Bad Info Command", 1);
166
        }
167
168
        if (is_null($stream) && is_null($group)) {
169
            return $this->redis->xInfo($command);
170
        }
171
172
        return is_null($group) ?
173
            $this->redis->xInfo($command, $stream) :
174
            $this->redis->xInfo($command, $stream, $group);
175
    }
176
177
178
    /**
179
     * Get the length of a given stream.
180
     *
181
     * See: https://redis.io/commands/xlen.
182
     *
183
     * @param  string $stream
184
     *
185
     * @return The number of messages in the stream.
186
     */
187
    public function xLen(string $stream): int
188
    {
189
        return $this->redis->xLen($stream);
190
    }
191
192
193
    /**
194
     * Get information about pending messages in a given stream.
195
     *
196
     * See: https://redis.io/commands/xpending.
197
     *
198
     * @param  string      $stream
199
     * @param  string      $group
200
     * @param  string|null $start
201
     * @param  string|null $end
202
     * @param  int|null    $count
203
     * @param  string|null $consumer
204
     *
205
     * @return array                    Information about the pending messages,
206
     *                                  in various forms depending on the specific
207
     *                                  invocation of XPENDING.
208
     */
209
    public function xPending(
210
        string $stream,
211
        string $group,
212
        ?string $start = null,
213
        ?string $end = null,
214
        ?int $count = null,
215
        ?string $consumer = null
216
    ): array {
217
        return is_null($start) || is_null($end) || is_null($count) || is_null($consumer) ?
218
            $this->redis->xPending($stream, $group) :
219
            $this->redis->xPending($stream, $group, $start, $end, $count, $consumer);
220
    }
221
222
223
    /**
224
     * Get a range of messages from a given stream.
225
     *
226
     * See: https://redis.io/commands/xrange.
227
     *
228
     * @param  string   $stream
229
     * @param  string   $start
230
     * @param  string   $end
231
     * @param  int|null $count
232
     *
233
     * @return array            The messages in the stream within the requested range.
234
     */
235
    public function xRange(string $stream, string $start, string $end, ?int $count = null): array
236
    {
237
        return is_null($count) ?
238
            $this->redis->xRange($stream, $start, $end) :
239
            $this->redis->xRange($stream, $start, $end, $count);
240
    }
241
242
243
    /**
244
     * Read data from one or more streams and only return IDs greater than sent in the command.
245
     *
246
     * See: https://redis.io/commands/xread.
247
     *
248
     * @param  array    $streams
249
     * @param  int|null $count
250
     * @param  int|null $block
251
     *
252
     * @return array            The messages in the stream newer than the IDs passed to Redis (if any).
253
     */
254
    public function xRead(array $streams, ?int $count = null, ?int $block = null): array
255
    {
256
        if (!is_null($count) && !is_null($block)) {
257
            return $this->redis->xRead($streams, $count, $block);
258
        }
259
260
        return is_null($count) ? $this->redis->xRead($streams) : $this->redis->xRead($streams, $count);
261
    }
262
263
264
    /**
265
     * This method is similar to xRead except that it supports reading messages for a specific consumer group.
266
     *
267
     * See: https://redis.io/commands/xreadgroup.
268
     *
269
     * @param  string   $group
270
     * @param  string   $consumer
271
     * @param  array    $streams
272
     * @param  int|null $count
273
     * @param  int|null $block
274
     *
275
     * @return array                The messages delivered to this consumer group (if any).
276
     */
277
    public function xReadGroup(
278
        string $group,
279
        string $consumer,
280
        array $streams,
281
        ?int $count = null,
282
        ?int $block = null
283
    ): array {
284
        if (!is_null($count) && !is_null($block)) {
285
            return $this->redis->xReadGroup($group, $consumer, $streams, $count, $block);
286
        }
287
288
        return is_null($count) ?
289
            $this->redis->xReadGroup($group, $consumer, $streams) :
290
            $this->redis->xReadGroup($group, $consumer, $streams, $count);
291
    }
292
293
294
    public function xRevRange(): bool
295
    {
296
        return false;
297
    }
298
299
    public function xTrim(): bool
300
    {
301
        return false;
302
    }
303
304
305
    /**
306
     * ************************************************************************
307
     * H E L P E R    F U N C T I O N S
308
     * ************************************************************************
309
     */
310
311
312
    /**
313
     * Claim Options available
314
     *
315
     * Note:  'TIME', and 'IDLE' are mutually exclusive
316
     *
317
     * 'IDLE' => $value, Set the idle time to $value ms
318
     * 'TIME' => $value, Set the idle time to now - $value
319
     * 'RETRYCOUNT' => $value, Update message retrycount to $value
320
     * 'FORCE', Claim the message(s) even if they're not pending anywhere
321
     * 'JUSTID',Instruct Redis to only return IDs
322
     *
323
     * @param  array  $options
324
     *
325
     * @return bool
326
     */
327
    private function _checkClaimOptions(array $options): bool
328
    {
329
        foreach ($options as $key => $value) {
330
            $check = is_numeric($key) ? $value : $key;
331
            if (!in_array($check, ['IDLE', 'TIME', 'RETRYCOUNT', 'FORCE', 'JUSTID'])) {
332
                return false;
333
            }
334
        }
335
336
        return !(array_key_exists('IDLE', $options) && array_key_exists('TIME', $options));
337
    }
338
339
340
    /**
341
     * [_checkInfoCommands description]
342
     *
343
     * @param  string $command
344
     *
345
     * @return bool
346
     */
347
    private function _checkInfoCommands(string $command): bool
348
    {
349
        return in_array($command, ['CONSUMERS', 'GROUPS', 'STREAM', 'HELP'], true);
350
    }
351
352
    /**
353
     * [_checkGroupCommands description]
354
     *
355
     * @param  string $command
356
     *
357
     * @return bool
358
     */
359
    private function _checkGroupCommands(string $command): bool
360
    {
361
        return in_array($command, ['HELP', 'CREATE', 'SETID', 'DESTROY', 'DELCONSUMER'], true);
362
    }
363
}
364