Passed
Push — master ( 33ab25...abe0dc )
by Eugene
01:44
created

StreamConnection::__construct()   A

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 0
Metric Value
eloc 2
dl 0
loc 4
ccs 3
cts 3
cp 1
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 2
crap 1
1
<?php
2
3
/**
4
 * This file is part of the Tarantool Client package.
5
 *
6
 * (c) Eugene Leonovich <[email protected]>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 */
11
12
declare(strict_types=1);
13
14
namespace Tarantool\Client\Connection;
15
16
use Tarantool\Client\Exception\CommunicationFailed;
17
use Tarantool\Client\Exception\ConnectionFailed;
18
use Tarantool\Client\Packer\PacketLength;
19
20
final class StreamConnection implements Connection
21
{
22
    public const DEFAULT_URI = 'tcp://127.0.0.1:3301';
23
24
    private const DEFAULT_OPTIONS = [
25
        'connect_timeout' => 5,
26
        'socket_timeout' => 5,
27
        'tcp_nodelay' => true,
28
        'persistent' => false,
29
    ];
30
31
    private $stream;
32
    private $streamContext;
33
    private $uri;
34
    private $options;
35
    private $greeting;
36
37 461
    private function __construct(string $uri, array $options)
38
    {
39 461
        $this->uri = $uri;
40 461
        $this->options = $options + self::DEFAULT_OPTIONS;
41 461
    }
42
43 305
    public static function createTcp(string $uri = self::DEFAULT_URI, array $options = []) : self
44
    {
45 305
        $self = new self($uri, $options);
46
47 305
        if ($self->options['tcp_nodelay'] ?? false) {
48 305
            $self->streamContext = \stream_context_create(['socket' => ['tcp_nodelay' => true]]);
49
        }
50
51 305
        return $self;
52
    }
53
54 159
    public static function createUds(string $uri, array $options = []) : self
55
    {
56 159
        return new self($uri, $options);
57
    }
58
59 461
    public static function create(string $uri, array $options = []) : self
60
    {
61 461
        return 0 === \strpos($uri, 'unix://')
62 159
            ? self::createUds($uri, $options)
63 461
            : self::createTcp($uri, $options);
64
    }
65
66 428
    public function open() : ?Greeting
67
    {
68 428
        if (\is_resource($this->stream)) {
69 233
            return $this->greeting;
70
        }
71
72 428
        $flags = $this->options['persistent']
73 6
            ? \STREAM_CLIENT_CONNECT | \STREAM_CLIENT_PERSISTENT
74 428
            : \STREAM_CLIENT_CONNECT;
75
76 428
        $stream = $this->streamContext ? @\stream_socket_client(
77 281
            $this->uri,
78
            $errorCode,
79
            $errorMessage,
80 281
            (float) $this->options['connect_timeout'],
81 281
            $flags,
82 281
            $this->streamContext
83 147
        ) : @\stream_socket_client(
84 147
            $this->uri,
85
            $errorCode,
86
            $errorMessage,
87 147
            (float) $this->options['connect_timeout'],
88 428
            $flags
89
        );
90
91 428
        if (false === $stream) {
92 9
            throw ConnectionFailed::fromUriAndReason($this->uri, $errorMessage);
93
        }
94
95 419
        $this->stream = $stream;
96 419
        \stream_set_timeout($this->stream, $this->options['socket_timeout']);
97
98 419
        if ($this->options['persistent'] && \ftell($this->stream)) {
99 3
            return $this->greeting;
100
        }
101
102 419
        $greeting = $this->read(Greeting::SIZE_BYTES, 'Unable to read greeting.');
103
104 413
        return $this->greeting = Greeting::parse($greeting);
105
    }
106
107 18
    public function close() : void
108
    {
109 18
        if (\is_resource($this->stream)) {
110 15
            \fclose($this->stream);
111
        }
112
113 18
        $this->stream = null;
114 18
        $this->greeting = null;
115 18
    }
116
117 6
    public function isClosed() : bool
118
    {
119 6
        return !\is_resource($this->stream);
120
    }
121
122 354
    public function send(string $data) : string
123
    {
124 354
        if (!\fwrite($this->stream, $data)) {
125 3
            throw new CommunicationFailed('Unable to write request.');
126
        }
127
128 354
        $length = $this->read(PacketLength::SIZE_BYTES, 'Unable to read response length.');
129 345
        $length = PacketLength::unpack($length);
130
131 345
        return $this->read($length, 'Unable to read response.');
132
    }
133
134 419
    private function read(int $length, string $errorMessage) : string
135
    {
136 419
        if ($data = \stream_get_contents($this->stream, $length)) {
137 413
            return $data;
138
        }
139
140 18
        $meta = \stream_get_meta_data($this->stream);
141 18
        throw new CommunicationFailed($meta['timed_out'] ? 'Read timed out.' : $errorMessage);
142
    }
143
}
144