Redis::__construct()   A
last analyzed

Complexity

Conditions 4
Paths 8

Size

Total Lines 26
Code Lines 13

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 13
dl 0
loc 26
c 0
b 0
f 0
rs 9.8333
cc 4
nc 8
nop 5
1
<?php
2
/**
3
 * Redis client
4
 * User: moyo
5
 * Date: 09/08/2017
6
 * Time: 3:02 PM
7
 */
8
9
namespace Carno\Redis;
10
11
use function Carno\Coroutine\await;
12
use function Carno\Coroutine\ctx;
13
use Carno\Pool\Managed;
14
use Carno\Pool\Poolable;
15
use Carno\Promise\Promise;
16
use Carno\Promise\Promised;
17
use Carno\Redis\Chips\Compatible;
18
use Carno\Redis\Chips\Subscriber;
19
use Carno\Redis\Exception\CommandException;
20
use Carno\Redis\Exception\ConnectingException;
21
use Carno\Redis\Exception\TimeoutException;
22
use Carno\Redis\Exception\UplinkException;
23
use Carno\Tracing\Contracts\Vars\EXT;
24
use Carno\Tracing\Contracts\Vars\TAG;
25
use Carno\Tracing\Standard\Endpoint;
26
use Carno\Tracing\Utils\SpansCreator;
27
use Swoole\Redis as SWRedis;
0 ignored issues
show
Bug introduced by
The type Swoole\Redis was not found. Maybe you did not declare it correctly or list all dependencies?

The issue could also be caused by a filter entry in the build configuration. If the path has been excluded in your configuration, e.g. excluded_paths: ["lib/*"], you can move it to the dependency path list as follows:

filter:
    dependency_paths: ["lib/*"]

For further information see https://scrutinizer-ci.com/docs/tools/php/php-scrutinizer/#list-dependency-paths

Loading history...
28
use Redis as PRedis;
29
30
/**
31
 * @mixin PRedis
32
 */
33
class Redis implements Poolable
34
{
35
    use Managed, SpansCreator, Compatible, Subscriber;
36
37
    /**
38
     * @var Timeouts
39
     */
40
    private $timeout = null;
41
42
    /**
43
     * @var string
44
     */
45
    private $named = null;
46
47
    /**
48
     * @var string
49
     */
50
    private $host = null;
51
52
    /**
53
     * @var int
54
     */
55
    private $port = null;
56
57
    /**
58
     * @var string
59
     */
60
    private $auth = null;
61
62
    /**
63
     * @var int
64
     */
65
    private $slot = null;
66
67
    /**
68
     * @var SWRedis
69
     */
70
    private $link = null;
71
72
    /**
73
     * Redis constructor.
74
     * @param string $target
75
     * @param string $auth
76
     * @param int $slot
77
     * @param Timeouts $timeout
78
     * @param string $named
79
     */
80
    public function __construct(
81
        string $target,
82
        string $auth = null,
83
        int $slot = null,
84
        Timeouts $timeout = null,
85
        string $named = 'redis'
86
    ) {
87
        if (substr($target, 0, 6) === 'unix:/') {
88
            $this->host = $target;
89
            $this->port = null;
90
        } else {
91
            list($this->host, $this->port) = explode(':', $target);
92
        }
93
94
        $this->auth = $auth;
95
        $this->slot = $slot;
96
97
        $this->named = $named;
98
        $this->timeout = $timeout ?? new Timeouts;
99
100
        $options = ['timeout' => round($this->timeout->connect() / 1000, 3)];
101
102
        is_null($this->auth) || $options['password'] = $this->auth;
103
        is_null($this->slot) || $options['database'] = $this->slot;
104
105
        $this->link = new SWRedis($options);
106
    }
107
108
    /**
109
     * @return Promised
110
     */
111
    public function connect() : Promised
112
    {
113
        $this->link->on('close', function () {
114
            unset($this->link);
115
            $this->closed()->resolve();
116
        });
117
118
        $this->link->on('message', function (SWRedis $c, array $recv) {
119
            $this->messaging($recv);
120
        });
121
122
        return new Promise(function (Promised $promise) {
123
            $executed = $this->link->connect(
124
                $this->host,
125
                $this->port,
126
                static function (SWRedis $c, bool $success) use ($promise) {
127
                    $success
128
                        ? $promise->resolve()
129
                        : $promise->throw(new ConnectingException($c->errMsg, $c->errCode))
130
                    ;
131
                }
132
            );
133
            if (false === $executed) {
134
                throw new ConnectingException('Unknown failure');
135
            }
136
        });
137
    }
138
139
    /**
140
     * @return Promised
141
     */
142
    public function heartbeat() : Promised
143
    {
144
        return new Promise(function (Promised $promised) {
145
            $this->link->__call('ping', [function (SWRedis $c, $result) use ($promised) {
146
                $result === false
147
                    ? $promised->reject()
148
                    : $promised->resolve()
149
                ;
150
            }]);
151
        });
152
    }
153
154
    /**
155
     * @return Promised
156
     */
157
    public function close() : Promised
158
    {
159
        $this->link->close();
160
        return $this->closed();
161
    }
162
163
    /**
164
     * @param $name
165
     * @param $arguments
166
     * @return Promised
167
     */
168
    public function __call($name, $arguments)
169
    {
170
        if ($this->subscribed()) {
171
            throw new CommandException(sprintf('Subscribe state cannot issue "%s"', $name));
172
        }
173
174
        $this->traced() && $this->newSpan($ctx = clone yield ctx(), $name, [
0 ignored issues
show
Bug Best Practice introduced by
The expression yield ctx() returns the type Generator which is incompatible with the documented return type Carno\Promise\Promised.
Loading history...
175
            TAG::SPAN_KIND => TAG::SPAN_KIND_RPC_CLIENT,
176
            TAG::DATABASE_TYPE => 'redis',
177
            TAG::DATABASE_INSTANCE => sprintf('%s:%d', $this->host, $this->port),
178
            TAG::DATABASE_STATEMENT => sprintf('%s %s', $name, $arguments[0] ?? ''),
179
            EXT::REMOTE_ENDPOINT => new Endpoint($this->named),
180
        ]);
181
182
        $executor = function ($fn) use ($name, $arguments) {
183
            array_push($arguments, $fn);
184
            $this->command($name, $arguments);
185
        };
186
187
        $receiver = static function (SWRedis $c, $result) {
188
            if ($result === false && $c->errCode > 0) {
189
                throw new CommandException($c->errMsg, $c->errCode);
190
            } else {
191
                return $result;
192
            }
193
        };
194
195
        return $this->finishSpan(
196
            await(
197
                $executor,
198
                $receiver,
199
                $this->timeout->execute(),
200
                TimeoutException::class,
201
                sprintf('%s:%d [->] %s', $this->host, $this->port, $name)
202
            ),
203
            $ctx ?? null
204
        );
205
    }
206
207
    /**
208
     * @param string $name
209
     * @param array $arguments
210
     */
211
    private function command(string $name, array $arguments) : void
212
    {
213
        if (false === $this->link->__call($name, $arguments)) {
214
            throw new UplinkException('Unknown failure');
215
        }
216
    }
217
}
218