Completed
Push — master ( bdeb81...7f157d )
by Eugene
06:32
created

StreamConnection::open()   B

Complexity

Conditions 7
Paths 13

Size

Total Lines 42

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 23
CRAP Score 7

Importance

Changes 0
Metric Value
cc 7
nc 13
nop 0
dl 0
loc 42
ccs 23
cts 23
cp 1
crap 7
rs 8.3146
c 0
b 0
f 0
1
<?php
2
3
/**
4
 * This file is part of the Tarantool Client package.
5
 *
6
 * (c) Eugene Leonovich <[email protected]>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 */
11
12
declare(strict_types=1);
13
14
namespace Tarantool\Client\Connection;
15
16
use Tarantool\Client\Exception\CommunicationFailed;
17
use Tarantool\Client\Exception\ConnectionFailed;
18
use Tarantool\Client\Packer\PacketLength;
19
20
final class StreamConnection implements Connection
21
{
22
    public const DEFAULT_URI = 'tcp://127.0.0.1:3301';
23
24
    private const DEFAULT_OPTIONS = [
25
        'connect_timeout' => 5,
26
        'socket_timeout' => 5,
27
        'tcp_nodelay' => true,
28
        'persistent' => false,
29
    ];
30
31
    /** @var resource|null */
32
    private $stream;
33
34
    /** @var resource|null */
35
    private $streamContext;
36
37
    /** @var string */
38
    private $uri;
39
40
    /** @var non-empty-array<string, mixed> */
41
    private $options;
42
43
    /** @var Greeting|null */
44
    private $greeting;
45
46
    /**
47
     * @param string $uri
48
     * @param array<string, mixed> $options
49
     */
50 396
    private function __construct($uri, $options)
51
    {
52 396
        $this->uri = $uri;
53 396
        $this->options = $options + self::DEFAULT_OPTIONS;
54 396
    }
55
56 356
    public static function createTcp(string $uri = self::DEFAULT_URI, array $options = []) : self
57
    {
58 356
        $self = new self($uri, $options);
59
60 356
        if ($self->options['tcp_nodelay'] ?? false) {
61 356
            $self->streamContext = \stream_context_create(['socket' => ['tcp_nodelay' => true]]);
62
        }
63
64 356
        return $self;
65
    }
66
67 48
    public static function createUds(string $uri, array $options = []) : self
68
    {
69 48
        return new self($uri, $options);
70
    }
71
72 396
    public static function create(string $uri, array $options = []) : self
73
    {
74 396
        return 0 === \strpos($uri, 'unix://')
75 48
            ? self::createUds($uri, $options)
76 396
            : self::createTcp($uri, $options);
77
    }
78
79
    /**
80
     * @see https://github.com/vimeo/psalm/issues/3021
81
     * @psalm-suppress InvalidNullableReturnType
82
     */
83 336
    public function open() : Greeting
84
    {
85 336
        if (\is_resource($this->stream)) {
86
            /** @see https://github.com/vimeo/psalm/issues/3021 */
87
            /** @psalm-suppress NullableReturnStatement */
88 177
            return $this->greeting;
89
        }
90
91 336
        $flags = $this->options['persistent']
92 4
            ? \STREAM_CLIENT_CONNECT | \STREAM_CLIENT_PERSISTENT
93 336
            : \STREAM_CLIENT_CONNECT;
94
95 336
        $stream = $this->streamContext ? @\stream_socket_client(
96 309
            $this->uri,
97
            $errorCode,
98
            $errorMessage,
99 309
            (float) $this->options['connect_timeout'],
100 309
            $flags,
101 309
            $this->streamContext
102 27
        ) : @\stream_socket_client(
103 27
            $this->uri,
104
            $errorCode,
105
            $errorMessage,
106 27
            (float) $this->options['connect_timeout'],
107 336
            $flags
108
        );
109
110 336
        if (false === $stream) {
111 14
            throw ConnectionFailed::fromUriAndReason($this->uri, $errorMessage);
112
        }
113
114 322
        $this->stream = $stream;
115 322
        \stream_set_timeout($this->stream, $this->options['socket_timeout']);
116
117 322
        if ($this->options['persistent'] && \ftell($this->stream)) {
118 2
            return $this->greeting = Greeting::unknown();
0 ignored issues
show
Documentation Bug introduced by
It seems like \Tarantool\Client\Connection\Greeting::unknown() of type object<self> is incompatible with the declared type object<Tarantool\Client\Connection\Greeting>|null of property $greeting.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
119
        }
120
121 322
        $greeting = $this->read(Greeting::SIZE_BYTES, 'Unable to read greeting');
122
123 316
        return $this->greeting = Greeting::parse($greeting);
0 ignored issues
show
Documentation Bug introduced by
It seems like \Tarantool\Client\Connec...eting::parse($greeting) of type object<self> is incompatible with the declared type object<Tarantool\Client\Connection\Greeting>|null of property $greeting.

Our type inference engine has found an assignment to a property that is incompatible with the declared type of that property.

Either this assignment is in error or the assigned type should be added to the documentation/type hint for that property..

Loading history...
124
    }
125
126 13
    public function close() : void
127
    {
128 13
        if (\is_resource($this->stream)) {
129
            /** @psalm-suppress InvalidPropertyAssignmentValue */
130 10
            \fclose($this->stream);
131
        }
132
133 13
        $this->stream = null;
134 13
        $this->greeting = null;
135 13
    }
136
137 5
    public function isClosed() : bool
138
    {
139 5
        return !\is_resource($this->stream);
140
    }
141
142 265
    public function send(string $data) : string
143
    {
144
        /** @psalm-suppress PossiblyNullArgument */
145 265
        if (!\fwrite($this->stream, $data)) {
146 2
            throw new CommunicationFailed('Unable to write request');
147
        }
148
149 265
        $length = $this->read(PacketLength::SIZE_BYTES, 'Unable to read response length');
150 257
        $length = PacketLength::unpack($length);
151
152 257
        return $this->read($length, 'Unable to read response');
153
    }
154
155 322
    private function read(int $length, string $errorMessage) : string
156
    {
157
        /** @psalm-suppress PossiblyNullArgument */
158 322
        if ($data = \stream_get_contents($this->stream, $length)) {
159 316
            return $data;
160
        }
161
162
        /** @psalm-suppress PossiblyNullArgument */
163 17
        $meta = \stream_get_meta_data($this->stream);
164 17
        throw new CommunicationFailed($meta['timed_out'] ? 'Read timed out' : $errorMessage);
165
    }
166
}
167