Passed
Pull Request — master (#47)
by Eugene
03:06
created

StreamConnection::open()   B

Complexity

Conditions 7
Paths 13

Size

Total Lines 39
Code Lines 26

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 27
CRAP Score 7

Importance

Changes 0
Metric Value
eloc 26
dl 0
loc 39
ccs 27
cts 27
cp 1
rs 8.5706
c 0
b 0
f 0
cc 7
nc 13
nop 0
crap 7
1
<?php
2
3
/**
4
 * This file is part of the Tarantool Client package.
5
 *
6
 * (c) Eugene Leonovich <[email protected]>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 */
11
12
declare(strict_types=1);
13
14
namespace Tarantool\Client\Connection;
15
16
use Tarantool\Client\Exception\CommunicationFailed;
17
use Tarantool\Client\Exception\ConnectionFailed;
18
use Tarantool\Client\Greeting;
19
use Tarantool\Client\Packer\PacketLength;
20
21
final class StreamConnection implements Connection
22
{
23
    public const DEFAULT_URI = 'tcp://127.0.0.1:3301';
24
25
    private const DEFAULT_OPTIONS = [
26
        'connect_timeout' => 5,
27
        'socket_timeout' => 5,
28
        'tcp_nodelay' => true,
29
        'persistent' => false,
30
    ];
31
32
    private $stream;
33
    private $streamContext;
34
    private $uri;
35
    private $options;
36
    private $greeting;
37
38 264
    private function __construct(string $uri, array $options)
39
    {
40 264
        $this->uri = $uri;
41 264
        $this->options = $options + self::DEFAULT_OPTIONS;
42 264
    }
43
44 264
    public static function createTcp(string $uri = self::DEFAULT_URI, array $options = []) : self
45
    {
46 264
        $self = new self($uri, $options);
47
48 264
        if ($self->options['tcp_nodelay'] ?? false) {
49 264
            $self->streamContext = \stream_context_create(['socket' => ['tcp_nodelay' => true]]);
50
        }
51
52 264
        return $self;
53
    }
54
55
    public static function createUds(string $uri, array $options = []) : self
56
    {
57
        return new self($uri, $options);
58
    }
59
60 264
    public static function create(string $uri, array $options = []) : self
61
    {
62 264
        return 0 === \strpos($uri, 'unix://')
63
            ? self::createUds($uri, $options)
64 264
            : self::createTcp($uri, $options);
65
    }
66
67 242
    public function open() : ?Greeting
68
    {
69 242
        if (\is_resource($this->stream)) {
70 124
            return $this->greeting;
71
        }
72
73 242
        $flags = $this->options['persistent']
74 4
            ? \STREAM_CLIENT_CONNECT | \STREAM_CLIENT_PERSISTENT
75 242
            : \STREAM_CLIENT_CONNECT;
76
77 242
        $stream = $this->streamContext ? @\stream_socket_client(
78 240
            $this->uri,
79 240
            $errorCode,
80 240
            $errorMessage,
81 240
            (float) $this->options['connect_timeout'],
82 240
            $flags,
83 240
            $this->streamContext
84 2
        ) : @\stream_socket_client(
85 2
            $this->uri,
86 2
            $errorCode,
87 2
            $errorMessage,
88 2
            (float) $this->options['connect_timeout'],
89 242
            $flags
90
        );
91
92 242
        if (false === $stream) {
93 6
            throw ConnectionFailed::fromUriAndReason($this->uri, $errorMessage);
94
        }
95
96 236
        $this->stream = $stream;
97 236
        \stream_set_timeout($this->stream, $this->options['socket_timeout']);
98
99 236
        if ($this->options['persistent'] && \ftell($this->stream)) {
100 2
            return $this->greeting;
101
        }
102
103 236
        $greeting = $this->read(Greeting::SIZE_BYTES, 'Unable to read greeting.');
104
105 232
        return $this->greeting = Greeting::parse($greeting);
106
    }
107
108 8
    public function close() : void
109
    {
110 8
        if (\is_resource($this->stream)) {
111 6
            \fclose($this->stream);
112
        }
113
114 8
        $this->stream = null;
115 8
        $this->greeting = null;
116 8
    }
117
118 2
    public function isClosed() : bool
119
    {
120 2
        return !\is_resource($this->stream);
121
    }
122
123 194
    public function send(string $data) : string
124
    {
125 194
        if (!\fwrite($this->stream, $data)) {
126
            throw new CommunicationFailed('Unable to write request.');
127
        }
128
129 194
        $length = $this->read(PacketLength::SIZE_BYTES, 'Unable to read response length.');
130 188
        $length = PacketLength::unpack($length);
131
132 188
        return $this->read($length, 'Unable to read response.');
133
    }
134
135 236
    private function read(int $length, string $errorMessage) : string
136
    {
137 236
        if ($data = \stream_get_contents($this->stream, $length)) {
138 232
            return $data;
139
        }
140
141 14
        $meta = \stream_get_meta_data($this->stream);
142 14
        throw new CommunicationFailed($meta['timed_out'] ? 'Read timed out.' : $errorMessage);
143
    }
144
}
145