Passed
Pull Request — master (#61)
by Eugene
05:00
created

StreamConnection::send()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 10
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 2

Importance

Changes 0
Metric Value
eloc 5
c 0
b 0
f 0
dl 0
loc 10
ccs 6
cts 6
cp 1
rs 10
cc 2
nc 2
nop 1
crap 2
1
<?php
2
3
/**
4
 * This file is part of the Tarantool Client package.
5
 *
6
 * (c) Eugene Leonovich <[email protected]>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 */
11
12
declare(strict_types=1);
13
14
namespace Tarantool\Client\Connection;
15
16
use Tarantool\Client\Exception\CommunicationFailed;
17
use Tarantool\Client\Exception\ConnectionFailed;
18
use Tarantool\Client\Greeting;
19
use Tarantool\Client\Packer\PacketLength;
20
21
final class StreamConnection implements Connection
22
{
23
    public const DEFAULT_URI = 'tcp://127.0.0.1:3301';
24
25
    private const DEFAULT_OPTIONS = [
26
        'connect_timeout' => 5,
27
        'socket_timeout' => 5,
28
        'tcp_nodelay' => true,
29
        'persistent' => false,
30
    ];
31
32
    private $stream;
33
    private $streamContext;
34
    private $uri;
35
    private $options;
36
    private $greeting;
37
38 270
    private function __construct(string $uri, array $options)
39
    {
40 270
        $this->uri = $uri;
41 270
        $this->options = $options + self::DEFAULT_OPTIONS;
42 270
    }
43
44 270
    public static function createTcp(string $uri = self::DEFAULT_URI, array $options = []) : self
45
    {
46 270
        $self = new self($uri, $options);
47
48 270
        if ($self->options['tcp_nodelay'] ?? false) {
49 270
            $self->streamContext = \stream_context_create(['socket' => ['tcp_nodelay' => true]]);
50
        }
51
52 270
        return $self;
53
    }
54
55
    public static function createUds(string $uri, array $options = []) : self
56
    {
57
        return new self($uri, $options);
58
    }
59
60 270
    public static function create(string $uri, array $options = []) : self
61
    {
62 270
        return 0 === \strpos($uri, 'unix://')
63
            ? self::createUds($uri, $options)
64 270
            : self::createTcp($uri, $options);
65
    }
66
67 248
    public function open() : ?Greeting
68
    {
69 248
        if (\is_resource($this->stream)) {
70 130
            return $this->greeting;
71
        }
72
73 248
        $flags = $this->options['persistent']
74 4
            ? \STREAM_CLIENT_CONNECT | \STREAM_CLIENT_PERSISTENT
75 248
            : \STREAM_CLIENT_CONNECT;
76
77 248
        $stream = $this->streamContext ? @\stream_socket_client(
78 246
            $this->uri,
79 246
            $errorCode,
80 246
            $errorMessage,
81 246
            (float) $this->options['connect_timeout'],
82 246
            $flags,
83 246
            $this->streamContext
84 2
        ) : @\stream_socket_client(
85 2
            $this->uri,
86 2
            $errorCode,
87 2
            $errorMessage,
88 2
            (float) $this->options['connect_timeout'],
89 248
            $flags
90
        );
91
92 248
        if (false === $stream) {
93 6
            throw ConnectionFailed::fromUriAndReason($this->uri, $errorMessage);
94
        }
95
96 242
        $this->stream = $stream;
97 242
        \stream_set_timeout($this->stream, $this->options['socket_timeout']);
98
99 242
        if ($this->options['persistent'] && \ftell($this->stream)) {
100 2
            return $this->greeting;
101
        }
102
103 242
        $greeting = $this->read(Greeting::SIZE_BYTES, 'Unable to read greeting.');
104
105 238
        return $this->greeting = Greeting::parse($greeting);
106
    }
107
108 12
    public function close() : void
109
    {
110 12
        if (\is_resource($this->stream)) {
111 10
            \fclose($this->stream);
112
        }
113
114 12
        $this->stream = null;
115 12
        $this->greeting = null;
116 12
    }
117
118 4
    public function isClosed() : bool
119
    {
120 4
        return !\is_resource($this->stream);
121
    }
122
123 200
    public function send(string $data) : string
124
    {
125 200
        if (!\fwrite($this->stream, $data)) {
126 2
            throw new CommunicationFailed('Unable to write request.');
127
        }
128
129 200
        $length = $this->read(PacketLength::SIZE_BYTES, 'Unable to read response length.');
130 194
        $length = PacketLength::unpack($length);
131
132 194
        return $this->read($length, 'Unable to read response.');
133
    }
134
135 242
    private function read(int $length, string $errorMessage) : string
136
    {
137 242
        if ($data = \stream_get_contents($this->stream, $length)) {
138 238
            return $data;
139
        }
140
141 12
        $meta = \stream_get_meta_data($this->stream);
142 12
        throw new CommunicationFailed($meta['timed_out'] ? 'Read timed out.' : $errorMessage);
143
    }
144
}
145