Completed
Push — master ( ede74e...9ffa6e )
by Roberto
17s queued 11s
created

Streams::xGroup()   B

Complexity

Conditions 6
Paths 6

Size

Total Lines 26

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
dl 0
loc 26
rs 8.8817
c 0
b 0
f 0
cc 6
nc 6
nop 5
1
<?php
2
3
namespace Webdcg\Redis\Traits;
4
5
trait Streams
6
{
7
    /**
8
     * Acknowledge one or more pending messages.
9
     *
10
     * See: https://redis.io/commands/xack.
11
     *
12
     * @param  string $stream
13
     * @param  string $group
14
     * @param  array  $messages
15
     *
16
     * @return int              The number of messages Redis reports as acknowledged.
17
     */
18
    public function xAck(string $stream, string $group, array $messages): int
19
    {
20
        return $this->redis->xAck($stream, $group, $messages);
0 ignored issues
show
Bug introduced by
The property redis does not exist. Did you maybe forget to declare it?

In PHP it is possible to write to properties without declaring them. For example, the following is perfectly valid PHP code:

class MyClass { }

$x = new MyClass();
$x->foo = true;

Generally, it is a good practice to explictly declare properties to avoid accidental typos and provide IDE auto-completion:

class MyClass {
    public $foo;
}

$x = new MyClass();
$x->foo = true;
Loading history...
21
    }
22
23
24
    /**
25
     * Appends the specified stream entry to the stream at the specified key.
26
     * If the key does not exist, as a side effect of running this command the
27
     * key is created with a stream value.
28
     *
29
     * See: https://redis.io/commands/xadd.
30
     *
31
     * @param  string      $key
32
     * @param  string      $id
33
     * @param  array       $message
34
     * @param  int|integer $maxLenght
35
     * @param  bool        $approximate
36
     *
37
     * @return string                   The added message ID
38
     */
39
    public function xAdd(
40
        string $key,
41
        string $id,
42
        array $message,
43
        ?int $maxLenght = null,
44
        ?bool $approximate = null
45
    ): string {
46
        if (is_null($maxLenght) && is_null($approximate)) {
47
            return $this->redis->xAdd($key, $id, $message);
48
        }
49
50
        return is_null($approximate) ?
51
            $this->redis->xAdd($key, $id, $message, $maxLenght) :
52
            $this->redis->xAdd($key, $id, $message, $maxLenght, $approximate);
53
    }
54
55
56
    /**
57
     * Claim ownership of one or more pending messages.
58
     *
59
     * See: https://redis.io/commands/xclaim.
60
     *
61
     * @param  string     $stream
62
     * @param  string     $group
63
     * @param  string     $consumer
64
     * @param  int        $minIdleTime
65
     * @param  array      $messageIds
66
     * @param  array|null $options
67
     *
68
     * @return array                    Either an array of message IDs along with
69
     *                                  corresponding data, or just an array of
70
     *                                  IDs (if the 'JUSTID' option was passed).
71
     */
72
    public function xClaim(
73
        string $stream,
74
        string $group,
75
        string $consumer,
76
        int $minIdleTime,
77
        array $messageIds,
78
        ?array $options = null
79
    ): array {
80
        if (!is_null($options) && !$this->_checkClaimOptions($options)) {
81
            throw new \Exception("Bad Claim Options", 1);
82
        }
83
84
        return is_null($options) ?
85
            $this->redis->xClaim($stream, $group, $consumer, $minIdleTime, $messageIds) :
86
            $this->redis->xClaim($stream, $group, $consumer, $minIdleTime, $messageIds, $options);
87
    }
88
89
90
    /**
91
     * Delete one or more messages from a stream.
92
     *
93
     * See: https://redis.io/commands/xdel.
94
     *
95
     * @param  string $stream
96
     * @param  array  $messageIds
97
     *
98
     * @return int                  The number of messages removed.
99
     */
100
    public function xDel(string $stream, array $messageIds): int
101
    {
102
        return $this->redis->xDel($stream, $messageIds);
103
    }
104
105
106
    /**
107
     * This command is used in order to create, destroy, or manage consumer groups.
108
     *
109
     * See: https://redis.io/commands/xgroup.
110
     *
111
     * @param  string       $command
112
     * @param  string|null  $stream
113
     * @param  string|null  $group
114
     * @param  string|null  $messageId_consumerName
115
     * @param  bool|boolean $makeStream
116
     *
117
     * @return mixed                                This command returns different
118
     *                                              types depending on the specific
119
     *                                              XGROUP command executed.
120
     */
121
    public function xGroup(
122
        string $command,
123
        ?string $stream = null,
124
        ?string $group = null,
125
        ?string $messageId_consumerName = null,
126
        bool $makeStream = false
127
    ) {
128
        $command = strtoupper($command);
129
130
        if (!$this->_checkGroupCommands($command)) {
131
            throw new \Exception("Bad Group Command", 1);
132
        }
133
134
        switch ($command) {
135
            case 'CREATE':
136
                return $this->redis->xGroup($command, $stream, $group, $messageId_consumerName, $makeStream);
137
            case 'SETID':
138
                return $this->redis->xGroup($command, $stream, $group, $messageId_consumerName);
139
            case 'DESTROY':
140
                return $this->redis->xGroup($command, $stream, $group);
141
            case 'DELCONSUMER':
142
                return $this->redis->xGroup($command, $stream, $group, $messageId_consumerName);
143
        }
144
145
        return $this->redis->xGroup($command);
146
    }
147
148
149
    /**
150
     * Get information about a stream or consumer groups.
151
     *
152
     * See: https://redis.io/commands/xinfo.
153
     *
154
     * @param  string $command
155
     * @param  string $stream
156
     * @param  string $group
157
     *
158
     * @return mixed            This command returns different types depending on which subcommand is used.
159
     */
160
    public function xInfo(string $command, ?string $stream = null, ?string $group = null)
161
    {
162
        $command = strtoupper($command);
163
164
        if (!$this->_checkInfoCommands($command)) {
165
            throw new \Exception("Bad Info Command", 1);
166
        }
167
168
        if (is_null($stream) && is_null($group)) {
169
            return $this->redis->xInfo($command);
170
        }
171
172
        return is_null($group) ?
173
            $this->redis->xInfo($command, $stream) :
174
            $this->redis->xInfo($command, $stream, $group);
175
    }
176
177
178
    /**
179
     * Get the length of a given stream.
180
     *
181
     * See: https://redis.io/commands/xlen.
182
     *
183
     * @param  string $stream
184
     *
185
     * @return The number of messages in the stream.
186
     */
187
    public function xLen(string $stream): int
188
    {
189
        return $this->redis->xLen($stream);
190
    }
191
192
193
    /**
194
     * Get information about pending messages in a given stream.
195
     *
196
     * See: https://redis.io/commands/xpending.
197
     *
198
     * @param  string      $stream
199
     * @param  string      $group
200
     * @param  string|null $start
201
     * @param  string|null $end
202
     * @param  int|null    $count
203
     * @param  string|null $consumer
204
     *
205
     * @return array                    Information about the pending messages,
206
     *                                  in various forms depending on the specific
207
     *                                  invocation of XPENDING.
208
     */
209
    public function xPending(
210
        string $stream,
211
        string $group,
212
        ?string $start = null,
213
        ?string $end = null,
214
        ?int $count = null,
215
        ?string $consumer = null
216
    ): array {
217
        return is_null($start) || is_null($end) || is_null($count) || is_null($consumer) ?
218
            $this->redis->xPending($stream, $group) :
219
            $this->redis->xPending($stream, $group, $start, $end, $count, $consumer);
220
    }
221
222
223
    /**
224
     * Get a range of messages from a given stream.
225
     *
226
     * See: https://redis.io/commands/xrange.
227
     *
228
     * @param  string   $stream
229
     * @param  string   $start
230
     * @param  string   $end
231
     * @param  int|null $count
232
     *
233
     * @return array            The messages in the stream within the requested range.
234
     */
235
    public function xRange(string $stream, string $start, string $end, ?int $count = null): array
236
    {
237
        return is_null($count) ?
238
            $this->redis->xRange($stream, $start, $end) :
239
            $this->redis->xRange($stream, $start, $end, $count);
240
    }
241
242
243
    /**
244
     * Read data from one or more streams and only return IDs greater than sent in the command.
245
     *
246
     * See: https://redis.io/commands/xread.
247
     *
248
     * @param  array    $streams
249
     * @param  int|null $count
250
     * @param  int|null $block
251
     *
252
     * @return array            The messages in the stream newer than the IDs passed to Redis (if any).
253
     */
254
    public function xRead(array $streams, ?int $count = null, ?int $block = null): array
255
    {
256
        if (!is_null($count) && !is_null($block)) {
257
            return $this->redis->xRead($streams, $count, $block);
258
        }
259
260
        return is_null($count) ? $this->redis->xRead($streams) : $this->redis->xRead($streams, $count);
261
    }
262
263
264
    /**
265
     * This method is similar to xRead except that it supports reading messages for a specific consumer group.
266
     *
267
     * See: https://redis.io/commands/xreadgroup.
268
     *
269
     * @param  string   $group
270
     * @param  string   $consumer
271
     * @param  array    $streams
272
     * @param  int|null $count
273
     * @param  int|null $block
274
     *
275
     * @return array                The messages delivered to this consumer group (if any).
276
     */
277
    public function xReadGroup(
278
        string $group,
279
        string $consumer,
280
        array $streams,
281
        ?int $count = null,
282
        ?int $block = null
283
    ): array {
284
        if (!is_null($count) && !is_null($block)) {
285
            return $this->redis->xReadGroup($group, $consumer, $streams, $count, $block);
286
        }
287
288
        return is_null($count) ?
289
            $this->redis->xReadGroup($group, $consumer, $streams) :
290
            $this->redis->xReadGroup($group, $consumer, $streams, $count);
291
    }
292
293
294
    /**
295
     * This is identical to xRange except the results come back in reverse order.
296
     * Also note that Redis reverses the order of "start" and "end".
297
     *
298
     * See: https://redis.io/commands/xrevrange.
299
     *
300
     * @param  string $stream
301
     * @param  string $end
302
     * @param  string $start
303
     * @param  int|null $count
304
     *
305
     * @return array            The messages in the range specified.
306
     */
307
    public function xRevRange(string $stream, string $end, string $start, ?int $count = null): array
308
    {
309
        return is_null($count) ?
310
            $this->redis->xRevRange($stream, $end, $start) :
311
            $this->redis->xRevRange($stream, $end, $start, $count);
312
    }
313
314
    public function xTrim(): bool
315
    {
316
        return false;
317
    }
318
319
320
    /**
321
     * ************************************************************************
322
     * H E L P E R    F U N C T I O N S
323
     * ************************************************************************
324
     */
325
326
327
    /**
328
     * Claim Options available
329
     *
330
     * Note:  'TIME', and 'IDLE' are mutually exclusive
331
     *
332
     * 'IDLE' => $value, Set the idle time to $value ms
333
     * 'TIME' => $value, Set the idle time to now - $value
334
     * 'RETRYCOUNT' => $value, Update message retrycount to $value
335
     * 'FORCE', Claim the message(s) even if they're not pending anywhere
336
     * 'JUSTID',Instruct Redis to only return IDs
337
     *
338
     * @param  array  $options
339
     *
340
     * @return bool
341
     */
342
    private function _checkClaimOptions(array $options): bool
343
    {
344
        foreach ($options as $key => $value) {
345
            $check = is_numeric($key) ? $value : $key;
346
            if (!in_array($check, ['IDLE', 'TIME', 'RETRYCOUNT', 'FORCE', 'JUSTID'])) {
347
                return false;
348
            }
349
        }
350
351
        return !(array_key_exists('IDLE', $options) && array_key_exists('TIME', $options));
352
    }
353
354
355
    /**
356
     * [_checkInfoCommands description]
357
     *
358
     * @param  string $command
359
     *
360
     * @return bool
361
     */
362
    private function _checkInfoCommands(string $command): bool
363
    {
364
        return in_array($command, ['CONSUMERS', 'GROUPS', 'STREAM', 'HELP'], true);
365
    }
366
367
    /**
368
     * [_checkGroupCommands description]
369
     *
370
     * @param  string $command
371
     *
372
     * @return bool
373
     */
374
    private function _checkGroupCommands(string $command): bool
375
    {
376
        return in_array($command, ['HELP', 'CREATE', 'SETID', 'DESTROY', 'DELCONSUMER'], true);
377
    }
378
}
379