RedisDriver::createGroup()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 1
Metric Value
cc 1
eloc 1
c 1
b 0
f 1
nc 1
nop 2
dl 0
loc 3
rs 10
1
<?php
2
/**
3
 * Copyright Aleksandar Panic
4
 *
5
 * Licensed under the Apache License, Version 2.0 (the "License");
6
 * you may not use this file except in compliance with the License.
7
 * You may obtain a copy of the License at
8
 *
9
 * http://www.apache.org/licenses/LICENSE-2.0
10
 *
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 **/
17
18
namespace ArekX\DataStreamer\Drivers;
19
20
21
use ArekX\DataStreamer\Contracts\Driver;
22
23
/**
24
 * Class RedisDriver
25
 * @package ArekX\DataStreamer\Drivers
26
 *
27
 * @codeCoverageIgnore
28
 */
29
class RedisDriver implements Driver
30
{
31
    /**
32
     * Represents an instance to native redis PHP extension
33
     *
34
     * @var \Redis
35
     */
36
    protected \Redis $redis;
37
38
    /**
39
     * RedisDriver constructor.
40
     */
41
    public function __construct()
42
    {
43
        $this->redis = new \Redis();
44
    }
45
46
    public function getClient(): \Redis
47
    {
48
        return $this->redis;
49
    }
50
51
    /**
52
     * @inheritDoc
53
     */
54
    public function connect(array $config): void
55
    {
56
        $this->redis->connect(
57
            $config['host'],
58
            $config['port'] ?? 6379,
59
            $config['timeout'] ?? 0.0,
60
            $config['reserved'] ?? null,
61
            $config['retryInterval'] ?? 0,
62
            $config['readTimeout'] ?? 0.0
63
        );
64
    }
65
66
    /**
67
     * @inheritDoc
68
     */
69
    public function createGroup(string $streamName, string $consumerGroup): void
70
    {
71
        $this->redis->xGroup('CREATE', $streamName, $consumerGroup, '$', true);
72
    }
73
74
    /**
75
     * @inheritDoc
76
     */
77
    public function acknowledge(string $stream, string $consumerGroup, array $ids): void
78
    {
79
        $this->redis->xAck($stream, $consumerGroup, $ids);
80
    }
81
82
    /**
83
     * @inheritDoc
84
     */
85
    public function sendMessage(string $stream, array $message): void
86
    {
87
        $this->redis->xAdd($stream, '*', $message);
88
    }
89
90
    /**
91
     * @inheritDoc
92
     */
93
    public function readMessages(string $consumerGroup, string $consumer, string $stream, string $fromId = self::FROM_START, int $count = 1, int $waitFor = 0): array
94
    {
95
        $data = $this->redis->xReadGroup($consumerGroup, $consumer, [$stream => $fromId], $count, $waitFor);
96
        if (empty($data)) {
97
            return [];
98
        }
99
        return $data[$stream];
100
    }
101
}