Passed
Branch php71 (acb31d)
by Eugene
02:53
created

StreamConnection::read()   A

Complexity

Conditions 3
Paths 2

Size

Total Lines 8
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 3

Importance

Changes 0
Metric Value
eloc 4
dl 0
loc 8
ccs 5
cts 5
cp 1
rs 10
c 0
b 0
f 0
cc 3
nc 2
nop 2
crap 3
1
<?php
2
3
declare(strict_types=1);
4
5
/*
6
 * This file is part of the Tarantool Client package.
7
 *
8
 * (c) Eugene Leonovich <[email protected]>
9
 *
10
 * For the full copyright and license information, please view the LICENSE
11
 * file that was distributed with this source code.
12
 */
13
14
namespace Tarantool\Client\Connection;
15
16
use Tarantool\Client\Exception\CommunicationFailed;
17
use Tarantool\Client\Exception\ConnectionFailed;
18
use Tarantool\Client\IProto;
19
use Tarantool\Client\Packer\PackUtils;
20
21
final class StreamConnection implements Connection
22
{
23
    public const DEFAULT_URI = 'tcp://127.0.0.1:3301';
24
25
    private const DEFAULT_OPTIONS = [
26
        'connect_timeout' => 5,
27
        'socket_timeout' => 5,
28
        'tcp_nodelay' => true,
29
    ];
30
31
    private $stream;
32
    private $streamContext;
33
    private $uri;
34
    private $options;
35
36 258
    private function __construct(string $uri, array $options)
37
    {
38 258
        $this->uri = $uri;
39 258
        $this->options = $options + self::DEFAULT_OPTIONS;
40 258
    }
41
42 258
    public static function createTcp(string $uri = self::DEFAULT_URI, array $options = []) : self
43
    {
44 258
        $self = new self($uri, $options);
45
46 258
        if ($self->options['tcp_nodelay'] ?? false) {
47 258
            $self->streamContext = \stream_context_create(['socket' => ['tcp_nodelay' => true]]);
48
        }
49
50 258
        return $self;
51
    }
52
53
    public static function createUds(string $uri, array $options = []) : self
54
    {
55
        return new self($uri, $options);
56
    }
57
58 258
    public static function create(string $uri, array $options = []) : self
59
    {
60 258
        return 0 === \strpos($uri, 'unix://')
61
            ? self::createUds($uri, $options)
62 258
            : self::createTcp($uri, $options);
63
    }
64
65 258
    public function open() : string
66
    {
67 258
        $this->close();
68
69 258
        $stream = $this->streamContext ? @\stream_socket_client(
70 256
            $this->uri,
71 256
            $errorCode,
72 256
            $errorMessage,
73 256
            (float) $this->options['connect_timeout'],
74 256
            \STREAM_CLIENT_CONNECT,
75 256
            $this->streamContext
76 2
        ) : @\stream_socket_client(
77 2
            $this->uri,
78 2
            $errorCode,
79 2
            $errorMessage,
80 258
            (float) $this->options['connect_timeout']
81
        );
82
83 258
        if (false === $stream) {
84 6
            throw ConnectionFailed::fromUriAndReason($this->uri, $errorMessage);
85
        }
86
87 252
        $this->stream = $stream;
88 252
        \stream_set_timeout($this->stream, $this->options['socket_timeout']);
89
90 252
        $greeting = $this->read(IProto::GREETING_SIZE, 'Unable to read greeting.');
91
92 248
        return IProto::parseGreeting($greeting);
93
    }
94
95 258
    public function close() : void
96
    {
97 258
        if ($this->stream) {
98 4
            \fclose($this->stream);
99 4
            $this->stream = null;
100
        }
101 258
    }
102
103 254
    public function isClosed() : bool
104
    {
105 254
        return !\is_resource($this->stream);
106
    }
107
108 214
    public function send(string $data) : string
109
    {
110 214
        if (!\fwrite($this->stream, $data)) {
111
            throw new CommunicationFailed('Unable to write request.');
112
        }
113
114 214
        $length = $this->read(IProto::LENGTH_SIZE, 'Unable to read response length.');
115 208
        $length = PackUtils::unpackLength($length);
116
117 208
        return $this->read($length, 'Unable to read response.');
118
    }
119
120 252
    private function read(int $length, string $errorMessage) : string
121
    {
122 252
        if ($data = \stream_get_contents($this->stream, $length)) {
123 248
            return $data;
124
        }
125
126 14
        $meta = \stream_get_meta_data($this->stream);
127 14
        throw new CommunicationFailed($meta['timed_out'] ? 'Read timed out.' : $errorMessage);
128
    }
129
}
130