Streams   B
last analyzed

Complexity

Total Complexity 48

Size/Duplication

Total Lines 398
Duplicated Lines 0 %

Coupling/Cohesion

Components 1
Dependencies 0

Importance

Changes 0
Metric Value
wmc 48
lcom 1
cbo 0
dl 0
loc 398
rs 8.5599
c 0
b 0
f 0

16 Methods

Rating   Name   Duplication   Size   Complexity  
A xAck() 0 4 1
A xAdd() 0 15 4
A xClaim() 0 16 4
A xDel() 0 4 1
B xGroup() 0 26 6
A xInfo() 0 16 5
A xLen() 0 4 1
A xPending() 0 12 5
A xRange() 0 6 2
A xRead() 0 8 4
A xReadGroup() 0 15 4
A xRevRange() 0 6 2
A xTrim() 0 6 2
A _checkClaimOptions() 0 11 5
A _checkInfoCommands() 0 4 1
A _checkGroupCommands() 0 4 1

How to fix   Complexity   

Complex Class

Complex classes like Streams often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes. You can also have a look at the cohesion graph to spot any un-connected, or weakly-connected components.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

While breaking up the class, it is a good idea to analyze how other classes use Streams, and based on these observations, apply Extract Interface, too.

1
<?php
2
3
namespace Webdcg\Redis\Traits;
4
5
/**
6
 * Support for Redis Stream Data Structure
7
 *
8
 * See: https://redis.io/topics/streams-intro
9
 * See: https://university.redislabs.com/courses
10
 */
11
trait Streams
12
{
13
    /**
14
     * Acknowledge one or more pending messages.
15
     *
16
     * See: https://redis.io/commands/xack.
17
     *
18
     * @param  string $stream
19
     * @param  string $group
20
     * @param  array  $messages
21
     *
22
     * @return int              The number of messages Redis reports as acknowledged.
23
     */
24
    public function xAck(string $stream, string $group, array $messages): int
25
    {
26
        return $this->redis->xAck($stream, $group, $messages);
0 ignored issues
show
Bug introduced by
The property redis does not exist. Did you maybe forget to declare it?

In PHP it is possible to write to properties without declaring them. For example, the following is perfectly valid PHP code:

class MyClass { }

$x = new MyClass();
$x->foo = true;

Generally, it is a good practice to explictly declare properties to avoid accidental typos and provide IDE auto-completion:

class MyClass {
    public $foo;
}

$x = new MyClass();
$x->foo = true;
Loading history...
27
    }
28
29
30
    /**
31
     * Appends the specified stream entry to the stream at the specified key.
32
     * If the key does not exist, as a side effect of running this command the
33
     * key is created with a stream value.
34
     *
35
     * See: https://redis.io/commands/xadd.
36
     *
37
     * @param  string      $key
38
     * @param  string      $id
39
     * @param  array       $message
40
     * @param  int|integer $maxLenght
41
     * @param  bool        $approximate
42
     *
43
     * @return string                   The added message ID
44
     */
45
    public function xAdd(
46
        string $key,
47
        string $id,
48
        array $message,
49
        ?int $maxLenght = null,
50
        ?bool $approximate = null
51
    ): string {
52
        if (is_null($maxLenght) && is_null($approximate)) {
53
            return $this->redis->xAdd($key, $id, $message);
54
        }
55
56
        return is_null($approximate) ?
57
            $this->redis->xAdd($key, $id, $message, $maxLenght) :
58
            $this->redis->xAdd($key, $id, $message, $maxLenght, $approximate);
59
    }
60
61
62
    /**
63
     * Claim ownership of one or more pending messages.
64
     *
65
     * See: https://redis.io/commands/xclaim.
66
     *
67
     * Note:  'TIME', and 'IDLE' are mutually exclusive
68
     *
69
     * 'IDLE' => $value, Set the idle time to $value ms
70
     * 'TIME' => $value, Set the idle time to now - $value
71
     * 'RETRYCOUNT' => $value, Update message retrycount to $value
72
     * 'FORCE', Claim the message(s) even if they're not pending anywhere
73
     * 'JUSTID',Instruct Redis to only return IDs
74
     *
75
     * @param  string     $stream
76
     * @param  string     $group
77
     * @param  string     $consumer
78
     * @param  int        $minIdleTime
79
     * @param  array      $messageIds
80
     * @param  array|null $options
81
     *
82
     * @return array                    Either an array of message IDs along with
83
     *                                  corresponding data, or just an array of
84
     *                                  IDs (if the 'JUSTID' option was passed).
85
     */
86
    public function xClaim(
87
        string $stream,
88
        string $group,
89
        string $consumer,
90
        int $minIdleTime,
91
        array $messageIds,
92
        ?array $options = null
93
    ): array {
94
        if (!is_null($options) && !$this->_checkClaimOptions($options)) {
95
            throw new \Exception("Bad Claim Options", 1);
96
        }
97
98
        return is_null($options) ?
99
            $this->redis->xClaim($stream, $group, $consumer, $minIdleTime, $messageIds) :
100
            $this->redis->xClaim($stream, $group, $consumer, $minIdleTime, $messageIds, $options);
101
    }
102
103
104
    /**
105
     * Delete one or more messages from a stream.
106
     *
107
     * See: https://redis.io/commands/xdel.
108
     *
109
     * @param  string $stream
110
     * @param  array  $messageIds
111
     *
112
     * @return int                  The number of messages removed.
113
     */
114
    public function xDel(string $stream, array $messageIds): int
115
    {
116
        return $this->redis->xDel($stream, $messageIds);
117
    }
118
119
120
    /**
121
     * This command is used in order to create, destroy, or manage consumer groups.
122
     *
123
     * See: https://redis.io/commands/xgroup.
124
     *
125
     * @param  string       $command
126
     * @param  string|null  $stream
127
     * @param  string|null  $group
128
     * @param  string|null  $messageId_consumerName
129
     * @param  bool|boolean $makeStream
130
     *
131
     * @return mixed                                This command returns different
132
     *                                              types depending on the specific
133
     *                                              XGROUP command executed.
134
     */
135
    public function xGroup(
136
        string $command,
137
        ?string $stream = null,
138
        ?string $group = null,
139
        ?string $messageId_consumerName = null,
140
        bool $makeStream = false
141
    ) {
142
        $command = strtoupper($command);
143
144
        if (!$this->_checkGroupCommands($command)) {
145
            throw new \Exception("Bad Group Command", 1);
146
        }
147
148
        switch ($command) {
149
            case 'CREATE':
150
                return $this->redis->xGroup($command, $stream, $group, $messageId_consumerName, $makeStream);
151
            case 'SETID':
152
                return $this->redis->xGroup($command, $stream, $group, $messageId_consumerName);
153
            case 'DESTROY':
154
                return $this->redis->xGroup($command, $stream, $group);
155
            case 'DELCONSUMER':
156
                return $this->redis->xGroup($command, $stream, $group, $messageId_consumerName);
157
        }
158
159
        return $this->redis->xGroup($command);
160
    }
161
162
163
    /**
164
     * Get information about a stream or consumer groups.
165
     *
166
     * See: https://redis.io/commands/xinfo.
167
     *
168
     * @param  string $command
169
     * @param  string $stream
170
     * @param  string $group
171
     *
172
     * @return mixed            This command returns different types depending on which subcommand is used.
173
     */
174
    public function xInfo(string $command, ?string $stream = null, ?string $group = null)
175
    {
176
        $command = strtoupper($command);
177
178
        if (!$this->_checkInfoCommands($command)) {
179
            throw new \Exception("Bad Info Command", 1);
180
        }
181
182
        if (is_null($stream) && is_null($group)) {
183
            return $this->redis->xInfo($command);
184
        }
185
186
        return is_null($group) ?
187
            $this->redis->xInfo($command, $stream) :
188
            $this->redis->xInfo($command, $stream, $group);
189
    }
190
191
192
    /**
193
     * Get the length of a given stream.
194
     *
195
     * See: https://redis.io/commands/xlen.
196
     *
197
     * @param  string $stream
198
     *
199
     * @return The number of messages in the stream.
200
     */
201
    public function xLen(string $stream): int
202
    {
203
        return $this->redis->xLen($stream);
204
    }
205
206
207
    /**
208
     * Get information about pending messages in a given stream.
209
     *
210
     * See: https://redis.io/commands/xpending.
211
     *
212
     * @param  string      $stream
213
     * @param  string      $group
214
     * @param  string|null $start
215
     * @param  string|null $end
216
     * @param  int|null    $count
217
     * @param  string|null $consumer
218
     *
219
     * @return array                    Information about the pending messages,
220
     *                                  in various forms depending on the specific
221
     *                                  invocation of XPENDING.
222
     */
223
    public function xPending(
224
        string $stream,
225
        string $group,
226
        ?string $start = null,
227
        ?string $end = null,
228
        ?int $count = null,
229
        ?string $consumer = null
230
    ): array {
231
        return is_null($start) || is_null($end) || is_null($count) || is_null($consumer) ?
232
            $this->redis->xPending($stream, $group) :
233
            $this->redis->xPending($stream, $group, $start, $end, $count, $consumer);
234
    }
235
236
237
    /**
238
     * Get a range of messages from a given stream.
239
     *
240
     * See: https://redis.io/commands/xrange.
241
     *
242
     * @param  string   $stream
243
     * @param  string   $start
244
     * @param  string   $end
245
     * @param  int|null $count
246
     *
247
     * @return array            The messages in the stream within the requested range.
248
     */
249
    public function xRange(string $stream, string $start, string $end, ?int $count = null): array
250
    {
251
        return is_null($count) ?
252
            $this->redis->xRange($stream, $start, $end) :
253
            $this->redis->xRange($stream, $start, $end, $count);
254
    }
255
256
257
    /**
258
     * Read data from one or more streams and only return IDs greater than sent in the command.
259
     *
260
     * See: https://redis.io/commands/xread.
261
     *
262
     * @param  array    $streams
263
     * @param  int|null $count
264
     * @param  int|null $block
265
     *
266
     * @return array            The messages in the stream newer than the IDs passed to Redis (if any).
267
     */
268
    public function xRead(array $streams, ?int $count = null, ?int $block = null): array
269
    {
270
        if (!is_null($count) && !is_null($block)) {
271
            return $this->redis->xRead($streams, $count, $block);
272
        }
273
274
        return is_null($count) ? $this->redis->xRead($streams) : $this->redis->xRead($streams, $count);
275
    }
276
277
278
    /**
279
     * This method is similar to xRead except that it supports reading messages for a specific consumer group.
280
     *
281
     * See: https://redis.io/commands/xreadgroup.
282
     *
283
     * @param  string   $group
284
     * @param  string   $consumer
285
     * @param  array    $streams
286
     * @param  int|null $count
287
     * @param  int|null $block
288
     *
289
     * @return array                The messages delivered to this consumer group (if any).
290
     */
291
    public function xReadGroup(
292
        string $group,
293
        string $consumer,
294
        array $streams,
295
        ?int $count = null,
296
        ?int $block = null
297
    ): array {
298
        if (!is_null($count) && !is_null($block)) {
299
            return $this->redis->xReadGroup($group, $consumer, $streams, $count, $block);
300
        }
301
302
        return is_null($count) ?
303
            $this->redis->xReadGroup($group, $consumer, $streams) :
304
            $this->redis->xReadGroup($group, $consumer, $streams, $count);
305
    }
306
307
308
    /**
309
     * This is identical to xRange except the results come back in reverse order.
310
     * Also note that Redis reverses the order of "start" and "end".
311
     *
312
     * See: https://redis.io/commands/xrevrange.
313
     *
314
     * @param  string $stream
315
     * @param  string $end
316
     * @param  string $start
317
     * @param  int|null $count
318
     *
319
     * @return array            The messages in the range specified.
320
     */
321
    public function xRevRange(string $stream, string $end, string $start, ?int $count = null): array
322
    {
323
        return is_null($count) ?
324
            $this->redis->xRevRange($stream, $end, $start) :
325
            $this->redis->xRevRange($stream, $end, $start, $count);
326
    }
327
328
329
    /**
330
     * Trim the stream length to a given maximum. If the "approximate" flag is
331
     * pasesed, Redis will use your size as a hint but only trim trees in whole
332
     * nodes (this is more efficient).
333
     *
334
     * See: https://redis.io/commands/xtrim.
335
     *
336
     * @param  string    $stream
337
     * @param  int       $maxLenght
338
     * @param  bool|null $approximate
339
     *
340
     * @return int                      The number of messages trimmed from the stream.
341
     */
342
    public function xTrim(string $stream, int $maxLenght, ?bool $approximate = null): int
343
    {
344
        return is_null($approximate) ?
345
            $this->redis->xTrim($stream, $maxLenght) :
346
            $this->redis->xTrim($stream, $maxLenght, $approximate);
347
    }
348
349
350
    /**
351
     * ************************************************************************
352
     * H E L P E R    F U N C T I O N S
353
     * ************************************************************************
354
     */
355
356
357
    /**
358
     * Claim Options available
359
     *
360
     * Note:  'TIME', and 'IDLE' are mutually exclusive
361
     *
362
     * 'IDLE' => $value, Set the idle time to $value ms
363
     * 'TIME' => $value, Set the idle time to now - $value
364
     * 'RETRYCOUNT' => $value, Update message retrycount to $value
365
     * 'FORCE', Claim the message(s) even if they're not pending anywhere
366
     * 'JUSTID',Instruct Redis to only return IDs
367
     *
368
     * @param  array  $options
369
     *
370
     * @return bool
371
     */
372
    private function _checkClaimOptions(array $options): bool
373
    {
374
        foreach ($options as $key => $value) {
375
            $check = is_numeric($key) ? $value : $key;
376
            if (!in_array($check, ['IDLE', 'TIME', 'RETRYCOUNT', 'FORCE', 'JUSTID'])) {
377
                return false;
378
            }
379
        }
380
381
        return !(array_key_exists('IDLE', $options) && array_key_exists('TIME', $options));
382
    }
383
384
385
    /**
386
     * [_checkInfoCommands description]
387
     *
388
     * @param  string $command
389
     *
390
     * @return bool
391
     */
392
    private function _checkInfoCommands(string $command): bool
393
    {
394
        return in_array($command, ['CONSUMERS', 'GROUPS', 'STREAM', 'HELP'], true);
395
    }
396
397
    /**
398
     * [_checkGroupCommands description]
399
     *
400
     * @param  string $command
401
     *
402
     * @return bool
403
     */
404
    private function _checkGroupCommands(string $command): bool
405
    {
406
        return in_array($command, ['HELP', 'CREATE', 'SETID', 'DESTROY', 'DELCONSUMER'], true);
407
    }
408
}
409