Passed
Pull Request — master (#47)
by Eugene
09:19
created

StreamConnection   A

Complexity

Total Complexity 21

Size/Duplication

Total Lines 122
Duplicated Lines 0 %

Test Coverage

Coverage 92.45%

Importance

Changes 0
Metric Value
wmc 21
eloc 62
dl 0
loc 122
ccs 49
cts 53
cp 0.9245
rs 10
c 0
b 0
f 0

9 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 4 1
A createUds() 0 3 1
A close() 0 8 2
A send() 0 10 2
A create() 0 5 2
B open() 0 39 7
A isClosed() 0 3 1
A read() 0 8 3
A createTcp() 0 9 2
1
<?php
2
3
/**
4
 * This file is part of the Tarantool Client package.
5
 *
6
 * (c) Eugene Leonovich <[email protected]>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 */
11
12
declare(strict_types=1);
13
14
namespace Tarantool\Client\Connection;
15
16
use Tarantool\Client\Exception\CommunicationFailed;
17
use Tarantool\Client\Exception\ConnectionFailed;
18
use Tarantool\Client\Greeting;
19
use Tarantool\Client\Packer\PacketLength;
20
21
final class StreamConnection implements Connection
22
{
23
    public const DEFAULT_URI = 'tcp://127.0.0.1:3301';
24
25
    private const DEFAULT_OPTIONS = [
26
        'connect_timeout' => 5,
27
        'socket_timeout' => 5,
28
        'tcp_nodelay' => true,
29
        'persistent' => false,
30
    ];
31
32
    private $stream;
33
    private $streamContext;
34
    private $uri;
35
    private $options;
36 258
    private $greeting;
37
38 258
    private function __construct(string $uri, array $options)
39 258
    {
40 258
        $this->uri = $uri;
41
        $this->options = $options + self::DEFAULT_OPTIONS;
42 258
    }
43
44 258
    public static function createTcp(string $uri = self::DEFAULT_URI, array $options = []) : self
45
    {
46 258
        $self = new self($uri, $options);
47 258
48
        if ($self->options['tcp_nodelay'] ?? false) {
49
            $self->streamContext = \stream_context_create(['socket' => ['tcp_nodelay' => true]]);
50 258
        }
51
52
        return $self;
53
    }
54
55
    public static function createUds(string $uri, array $options = []) : self
56
    {
57
        return new self($uri, $options);
58 258
    }
59
60 258
    public static function create(string $uri, array $options = []) : self
61
    {
62 258
        return 0 === \strpos($uri, 'unix://')
63
            ? self::createUds($uri, $options)
64
            : self::createTcp($uri, $options);
65 236
    }
66
67 236
    public function open() : ?Greeting
68
    {
69 236
        if (\is_resource($this->stream)) {
70 234
            return $this->greeting;
71 234
        }
72 234
73 234
        $flags = $this->options['persistent']
74 234
            ? \STREAM_CLIENT_CONNECT | \STREAM_CLIENT_PERSISTENT
75 234
            : \STREAM_CLIENT_CONNECT;
76 2
77 2
        $stream = $this->streamContext ? @\stream_socket_client(
78 2
            $this->uri,
79 2
            $errorCode,
80 236
            $errorMessage,
81
            (float) $this->options['connect_timeout'],
82
            $flags,
83 236
            $this->streamContext
84 6
        ) : @\stream_socket_client(
85
            $this->uri,
86
            $errorCode,
87 230
            $errorMessage,
88 230
            (float) $this->options['connect_timeout'],
89
            $flags
90 230
        );
91
92 226
        if (false === $stream) {
93
            throw ConnectionFailed::fromUriAndReason($this->uri, $errorMessage);
94
        }
95 236
96
        $this->stream = $stream;
97 236
        \stream_set_timeout($this->stream, $this->options['socket_timeout']);
98 6
99 6
        if ($this->options['persistent'] && \ftell($this->stream)) {
100
            return $this->greeting;
101 236
        }
102
103 232
        $greeting = $this->read(Greeting::SIZE_BYTES, 'Unable to read greeting.');
104
105 232
        return $this->greeting = Greeting::parse($greeting);
106
    }
107
108 192
    public function close() : void
109
    {
110 192
        if (\is_resource($this->stream)) {
111
            \fclose($this->stream);
112
        }
113
114 192
        $this->stream = null;
115 186
        $this->greeting = null;
116
    }
117 186
118
    public function isClosed() : bool
119
    {
120 230
        return !\is_resource($this->stream);
121
    }
122 230
123 226
    public function send(string $data) : string
124
    {
125
        if (!\fwrite($this->stream, $data)) {
126 14
            throw new CommunicationFailed('Unable to write request.');
127 14
        }
128
129
        $length = $this->read(PacketLength::SIZE_BYTES, 'Unable to read response length.');
130
        $length = PacketLength::unpack($length);
131
132
        return $this->read($length, 'Unable to read response.');
133
    }
134
135
    private function read(int $length, string $errorMessage) : string
136
    {
137
        if ($data = \stream_get_contents($this->stream, $length)) {
138
            return $data;
139
        }
140
141
        $meta = \stream_get_meta_data($this->stream);
142
        throw new CommunicationFailed($meta['timed_out'] ? 'Read timed out.' : $errorMessage);
143
    }
144
}
145