PredisDriver::createGroup()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 3
Code Lines 1

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 1
Metric Value
cc 1
eloc 1
c 1
b 0
f 1
nc 1
nop 2
dl 0
loc 3
rs 10
1
<?php
2
/**
3
 * Copyright Aleksandar Panic
4
 *
5
 * Licensed under the Apache License, Version 2.0 (the "License");
6
 * you may not use this file except in compliance with the License.
7
 * You may obtain a copy of the License at
8
 *
9
 * http://www.apache.org/licenses/LICENSE-2.0
10
 *
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 **/
17
18
19
namespace ArekX\DataStreamer\Drivers;
20
21
22
use ArekX\DataStreamer\Contracts\Driver;
23
use Predis\Client;
1 ignored issue
show
Bug introduced by
The type Predis\Client was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
24
25
/**
26
 * Class PredisDriver
27
 * @package ArekX\DataStreamer\Drivers
28
 *
29
 * Represents an implementation using
30
 * pure PHP redis driver.
31
 *
32
 * @codeCoverageIgnore
33
 */
34
class PredisDriver implements Driver
35
{
36
    /**
37
     * Predis Client
38
     * @var Client
39
     */
40
    protected Client $client;
41
42
    /**
43
     * Returns an active client.
44
     * @return Client
45
     */
46
    public function getClient(): Client
47
    {
48
        return $this->client;
49
    }
50
51
    /**
52
     * @inheritDoc
53
     */
54
    public function connect(array $config): void
55
    {
56
        $this->client = new Client($config['parameters'] ?? null, $config['options'] ?? null);
57
    }
58
59
    /**
60
     * @inheritDoc
61
     */
62
    public function createGroup(string $streamName, string $consumerGroup): void
63
    {
64
        $this->client->executeRaw(['XGROUP', 'CREATE', $streamName, $consumerGroup, '$', 'MKSTREAM']);
65
    }
66
67
    /**
68
     * @inheritDoc
69
     */
70
    public function readMessages(string $consumerGroup, string $consumer, string $stream, string $fromId = self::FROM_START, int $count = 1, int $waitFor = 0): array
71
    {
72
        $response = $this->client->executeRaw(['XREADGROUP', 'GROUP', $consumerGroup, $consumer, 'COUNT', $count, 'BLOCK', $waitFor, 'STREAMS', $stream, $fromId]);
73
74
        if (empty($response) || empty($response[0])) {
75
            return [];
76
        }
77
78
        $data = $response[0][1];
79
80
        $messages = [];
81
82
        foreach ($data as $item) {
83
            $message = [];
84
            $max = count($item[1]);
85
            for($i = 1; $i < $max; $i += 2) {
86
                $message[$item[1][$i - 1]] = $item[1][$i];
87
            }
88
89
            $messages[$item[0]] = $message;
90
        }
91
92
        return $messages;
93
    }
94
95
    /**
96
     * @inheritDoc
97
     */
98
    public function acknowledge(string $stream, string $consumerGroup, array $ids): void
99
    {
100
        $this->client->executeRaw(['XACK', $stream, $consumerGroup, ...$ids]);
101
    }
102
103
    /**
104
     * @inheritDoc
105
     */
106
    public function sendMessage(string $stream, array $message): void
107
    {
108
        $command = ['XADD', $stream, '*'];
109
110
        foreach ($message as $key => $value) {
111
            $command[] = $key;
112
            $command[] = $value;
113
        }
114
115
       $this->client->executeRaw($command);
116
    }
117
}