Passed
Push — master ( de02d9...3b3597 )
by Eugene
03:38
created

StreamConnection   A

Complexity

Total Complexity 17

Size/Duplication

Total Lines 107
Duplicated Lines 0 %

Test Coverage

Coverage 92.45%

Importance

Changes 0
Metric Value
wmc 17
eloc 52
dl 0
loc 107
ccs 49
cts 53
cp 0.9245
rs 10
c 0
b 0
f 0

9 Methods

Rating   Name   Duplication   Size   Complexity  
A __construct() 0 4 1
A createUds() 0 3 1
A close() 0 5 2
A send() 0 10 2
A create() 0 5 2
A open() 0 28 3
A isClosed() 0 3 1
A read() 0 8 3
A createTcp() 0 9 2
1
<?php
2
3
/**
4
 * This file is part of the Tarantool Client package.
5
 *
6
 * (c) Eugene Leonovich <[email protected]>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 */
11
12
declare(strict_types=1);
13
14
namespace Tarantool\Client\Connection;
15
16
use Tarantool\Client\Exception\CommunicationFailed;
17
use Tarantool\Client\Exception\ConnectionFailed;
18
use Tarantool\Client\Greeting;
19
use Tarantool\Client\Packer\PackUtils;
20
use Tarantool\Client\Response;
21
22
final class StreamConnection implements Connection
23
{
24
    public const DEFAULT_URI = 'tcp://127.0.0.1:3301';
25
26
    private const DEFAULT_OPTIONS = [
27
        'connect_timeout' => 5,
28
        'socket_timeout' => 5,
29
        'tcp_nodelay' => true,
30
    ];
31
32
    private $stream;
33
    private $streamContext;
34
    private $uri;
35
    private $options;
36
37 252
    private function __construct(string $uri, array $options)
38
    {
39 252
        $this->uri = $uri;
40 252
        $this->options = $options + self::DEFAULT_OPTIONS;
41 252
    }
42
43 252
    public static function createTcp(string $uri = self::DEFAULT_URI, array $options = []) : self
44
    {
45 252
        $self = new self($uri, $options);
46
47 252
        if ($self->options['tcp_nodelay'] ?? false) {
48 252
            $self->streamContext = \stream_context_create(['socket' => ['tcp_nodelay' => true]]);
49
        }
50
51 252
        return $self;
52
    }
53
54
    public static function createUds(string $uri, array $options = []) : self
55
    {
56
        return new self($uri, $options);
57
    }
58
59 252
    public static function create(string $uri, array $options = []) : self
60
    {
61 252
        return 0 === \strpos($uri, 'unix://')
62
            ? self::createUds($uri, $options)
63 252
            : self::createTcp($uri, $options);
64
    }
65
66 234
    public function open() : string
67
    {
68 234
        $this->close();
69
70 234
        $stream = $this->streamContext ? @\stream_socket_client(
71 232
            $this->uri,
72 232
            $errorCode,
73 232
            $errorMessage,
74 232
            (float) $this->options['connect_timeout'],
75 232
            \STREAM_CLIENT_CONNECT,
76 232
            $this->streamContext
77 2
        ) : @\stream_socket_client(
78 2
            $this->uri,
79 2
            $errorCode,
80 2
            $errorMessage,
81 234
            (float) $this->options['connect_timeout']
82
        );
83
84 234
        if (false === $stream) {
85 6
            throw ConnectionFailed::fromUriAndReason($this->uri, $errorMessage);
86
        }
87
88 228
        $this->stream = $stream;
89 228
        \stream_set_timeout($this->stream, $this->options['socket_timeout']);
90
91 228
        $greeting = $this->read(Greeting::SIZE_BYTES, 'Unable to read greeting.');
92
93 224
        return Greeting::parse($greeting);
94
    }
95
96 234
    public function close() : void
97
    {
98 234
        if ($this->stream) {
99 4
            \fclose($this->stream);
100 4
            $this->stream = null;
101
        }
102 234
    }
103
104 230
    public function isClosed() : bool
105
    {
106 230
        return !\is_resource($this->stream);
107
    }
108
109 190
    public function send(string $data) : string
110
    {
111 190
        if (!\fwrite($this->stream, $data)) {
112
            throw new CommunicationFailed('Unable to write request.');
113
        }
114
115 190
        $length = $this->read(Response::LENGTH_SIZE_BYTES, 'Unable to read response length.');
116 184
        $length = PackUtils::unpackLength($length);
117
118 184
        return $this->read($length, 'Unable to read response.');
119
    }
120
121 228
    private function read(int $length, string $errorMessage) : string
122
    {
123 228
        if ($data = \stream_get_contents($this->stream, $length)) {
124 224
            return $data;
125
        }
126
127 14
        $meta = \stream_get_meta_data($this->stream);
128 14
        throw new CommunicationFailed($meta['timed_out'] ? 'Read timed out.' : $errorMessage);
129
    }
130
}
131