Completed
Pull Request — master (#7)
by Sandro
02:42
created

Client::transmit()   B

Complexity

Conditions 10
Paths 7

Size

Total Lines 37
Code Lines 24

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 23
CRAP Score 10.0071

Importance

Changes 0
Metric Value
eloc 24
dl 0
loc 37
ccs 23
cts 24
cp 0.9583
rs 7.6666
c 0
b 0
f 0
cc 10
nc 7
nop 2
crap 10.0071

How to fix   Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
/**
3
 * Sandro Keil (https://sandro-keil.de)
4
 *
5
 * @link      http://github.com/sandrokeil/arangodb-php-client for the canonical source repository
6
 * @copyright Copyright (c) 2018-2019 Sandro Keil
7
 * @license   http://github.com/sandrokeil/arangodb-php-client/blob/master/LICENSE.md New BSD License
8
 */
9
10
declare(strict_types=1);
11
12
namespace ArangoDb;
13
14
use ArangoDb\Exception\ConnectionException;
15
use ArangoDb\Exception\NetworkException;
16
use ArangoDb\Exception\RequestFailedException;
17
use ArangoDb\Exception\TimeoutException;
18
use Fig\Http\Message\StatusCodeInterface;
19
use Psr\Http\Client\ClientInterface;
20
use Psr\Http\Message\RequestInterface;
21
use Psr\Http\Message\ResponseFactoryInterface;
22
use Psr\Http\Message\ResponseInterface;
23
use Psr\Http\Message\StreamFactoryInterface;
24
25
final class Client implements ClientInterface
26
{
27
    /**
28
     * Chunk size in bytes
29
     */
30
    private const CHUNK_SIZE = 8192;
31
32
    /**
33
     * End of line mark used in HTTP
34
     */
35
    private const EOL = "\r\n";
36
37
    /**
38
     * Connection handle
39
     *
40
     * @var resource
41
     */
42
    private $handle;
43
44
    /**
45
     * @var bool
46
     */
47
    private $useKeepAlive;
48
49
    /**
50
     * @var ClientOptions
51
     */
52
    private $options;
53
54
    /**
55
     * @var string
56
     */
57
    private $baseUrl = '';
58
59
    /**
60
     * Default headers for all requests
61
     *
62
     * @var string
63
     */
64
    private $headerLines = '';
65
66
    /**
67
     * @var string
68
     */
69
    private $database = '';
70
71
    /**
72
     * @var ResponseFactoryInterface
73
     */
74
    private $responseFactory;
75
76
    /**
77
     * @var StreamFactoryInterface
78
     */
79
    private $streamFactory;
80
81
    /**
82
     * @param array|ClientOptions $options
83
     * @param ResponseFactoryInterface $responseFactory
84
     * @param StreamFactoryInterface $streamFactory
85
     */
86 44
    public function __construct(
87
        $options,
88
        ResponseFactoryInterface $responseFactory,
89
        StreamFactoryInterface $streamFactory
90
    ) {
91 44
        $this->options = $options instanceof ClientOptions ? $options : new ClientOptions($options);
92 44
        $this->useKeepAlive = ($this->options[ClientOptions::OPTION_CONNECTION] === 'Keep-Alive');
93 44
        $this->responseFactory = $responseFactory;
94 44
        $this->streamFactory = $streamFactory;
95 44
        $this->updateCommonHttpHeaders();
96 44
    }
97
98
    /**
99
     * Delegate / shorthand method
100
     *
101
     * @param RequestInterface $request
102
     * @return ResponseInterface
103
     * @throws \Psr\Http\Client\ClientExceptionInterface
104
     */
105
    public function __invoke(RequestInterface $request): ResponseInterface
106
    {
107
        return $this->sendRequest($request);
108
    }
109
110 42
    public function sendRequest(RequestInterface $request): ResponseInterface
111
    {
112
        try {
113 42
            $stream = $request->getBody();
114 42
            $body = $stream->getContents();
115 42
            $method = $request->getMethod();
116
117 42
            $customHeaders = $request->getHeaders();
118 42
            unset($customHeaders['Connection'], $customHeaders['Content-Length']);
119
120 42
            if (! isset($customHeaders['Content-Type'])) {
121 7
                $customHeaders['Content-Type'] = ['application/json'];
122
            }
123
124 42
            $customHeader = '';
125 42
            foreach ($customHeaders as $headerKey => $headerValues) {
126 42
                foreach ($headerValues as $headerValue) {
127 42
                    $customHeader .= $headerKey . ': ' . $headerValue . self::EOL;
128
                }
129
            }
130
        } catch (\Throwable $e) {
131
            throw RequestFailedException::ofRequest($request, $e);
132
        }
133
134 42
        $customHeader .= 'Content-Length: ' . $stream->getSize() . self::EOL;
135
136 42
        $url = $this->baseUrl . $request->getUri();
137
138
        try {
139 42
            $this->open($request);
140
141 42
            $result = $this->transmit(
142 42
                $method . ' ' . $url . ' HTTP/1.1' .
143 42
                $this->headerLines .
144 42
                $customHeader . self::EOL .
145 42
                $body,
146
                $method
147
            );
148
            // TODO https://docs.arangodb.com/3.4/Manual/Architecture/DeploymentModes/ActiveFailover/Architecture.html
149 42
            $status = stream_get_meta_data($this->handle);
150
151 42
            if (true === $status['timed_out']) {
152
                throw TimeoutException::ofRequest($request);
153
            }
154 42
            if (! $this->useKeepAlive) {
155
                $this->close();
156
            }
157
158 42
            [$httpCode, $headers, $body] = HttpHelper::parseMessage($result);
159
        } catch (\Throwable $e) {
160
            throw NetworkException::with($request, $e);
161
        }
162 42
        $response = $this->responseFactory->createResponse($httpCode);
163
164 42
        foreach ($headers as $headerName => $header) {
165 42
            $response = $response->withAddedHeader($headerName, $header);
166
        }
167
168 42
        return $response->withBody($this->streamFactory->createStream($body));
169
    }
170
171
    /**
172
     * Sends request to server and reads response.
173
     *
174
     * @param string $request
175
     * @param string $method
176
     * @return string
177
     */
178 42
    private function transmit(string $request, string $method): string
179
    {
180 42
        fwrite($this->handle, $request);
181 42
        fflush($this->handle);
182
183 42
        $contentLength = 0;
184 42
        $bodyLength = 0;
185 42
        $readTotal = 0;
186 42
        $matches = [];
187 42
        $message = '';
188
189
        do {
190 42
            $read = fread($this->handle, self::CHUNK_SIZE);
191 42
            if (false === $read || $read === '') {
192
                break;
193
            }
194 42
            $readLength = strlen($read);
195 42
            $readTotal += $readLength;
196 42
            $message .= $read;
197
198 42
            if ($contentLength === 0
199 42
                && $method !== 'HEAD'
200 42
                && 1 === preg_match('/content-length: (\d+)/i', $message, $matches)
201
            ) {
202 42
                $contentLength = (int)$matches[1];
203
            }
204
205 42
            if ($bodyLength === 0) {
206 42
                $bodyStart = strpos($message, "\r\n\r\n");
207
208 42
                if (false !== $bodyStart) {
209 42
                    $bodyLength = $bodyStart + $contentLength + 4;
210
                }
211
            }
212 42
        } while ($readTotal < $bodyLength && ! feof($this->handle));
213
214 42
        return $message;
215
    }
216
217
    /**
218
     * Update common HTTP headers for all HTTP requests
219
     */
220 44
    private function updateCommonHttpHeaders(): void
221
    {
222 44
        $this->headerLines = self::EOL;
223
224 44
        $endpoint = $this->options[ClientOptions::OPTION_ENDPOINT];
225 44
        if (1 !== preg_match('/^unix:\/\/.+/', $endpoint)) {
226 44
            $this->headerLines .= 'Host: '
227 44
                . preg_replace('/^(tcp|ssl):\/\/(.+?):(\d+)\/?$/', '\\2', $endpoint)
228 44
                . self::EOL;
229
        }
230
        // add basic auth header
231
        if (isset(
232 44
            $this->options[ClientOptions::OPTION_AUTH_TYPE],
233 44
            $this->options[ClientOptions::OPTION_AUTH_USER]
234
        )) {
235
            $this->headerLines .= sprintf(
236
                'Authorization: %s %s%s',
237
                $this->options[ClientOptions::OPTION_AUTH_TYPE],
238
                base64_encode(
239
                    $this->options[ClientOptions::OPTION_AUTH_USER] . ':' .
240
                    $this->options[ClientOptions::OPTION_AUTH_PASSWD]
241
                ),
242
                self::EOL
243
            );
244
        }
245
246 44
        if (isset($this->options[ClientOptions::OPTION_CONNECTION])) {
247 44
            $this->headerLines .= 'Connection: ' . $this->options[ClientOptions::OPTION_CONNECTION] . self::EOL;
248
        }
249
250 44
        $this->database = $this->options[ClientOptions::OPTION_DATABASE];
251 44
        $this->baseUrl = '/_db/' . urlencode($this->database);
252 44
    }
253
254
    /**
255
     * Opens connection depending on options.
256
     *
257
     * @param RequestInterface $request
258
     */
259 42
    private function open(RequestInterface $request): void
260
    {
261 42
        if ($this->useKeepAlive && $this->handle !== null && is_resource($this->handle)) {
262 18
            if (! feof($this->handle)) {
263 18
                return;
264
            }
265
266
            $this->close();
267
268
            if (false === $this->options[ClientOptions::OPTION_RECONNECT]) {
269
                throw ConnectionException::forRequest(
270
                    $request,
271
                    'Server has closed the connection already.',
272
                    StatusCodeInterface::STATUS_REQUEST_TIMEOUT
273
                );
274
            }
275
        }
276
277 42
        $endpoint = $this->options[ClientOptions::OPTION_ENDPOINT];
278 42
        $context = stream_context_create();
279
280 42
        if (1 === preg_match('/^ssl:\/\/.+/', $endpoint)) {
281
            stream_context_set_option(
282
                $context,
283
                [
284
                    'ssl' => [
285
                        'verify_peer' => $this->options[ClientOptions::OPTION_VERIFY_CERT],
286
                        'verify_peer_name' => $this->options[ClientOptions::OPTION_VERIFY_CERT_NAME],
287
                        'allow_self_signed' => $this->options[ClientOptions::OPTION_ALLOW_SELF_SIGNED],
288
                        'ciphers' => $this->options[ClientOptions::OPTION_CIPHERS],
289
                    ],
290
                ]
291
            );
292
        }
293
294 42
        $handle = stream_socket_client(
295 42
            $endpoint,
296
            $errNo,
297
            $message,
298 42
            $this->options[ClientOptions::OPTION_TIMEOUT],
299 42
            STREAM_CLIENT_CONNECT,
300
            $context
301
        );
302
303 42
        if (false === $handle) {
304
            throw ConnectionException::forRequest(
305
                $request,
306
                sprintf('Cannot connect to endpoint "%s". Message: %s', $endpoint, $message),
307
                $errNo
308
            );
309
        }
310 42
        $this->handle = $handle;
311 42
        stream_set_timeout($this->handle, $this->options[ClientOptions::OPTION_TIMEOUT]);
312 42
    }
313
314
    /**
315
     * Closes connection
316
     */
317
    private function close(): void
318
    {
319
        fclose($this->handle);
320
        unset($this->handle);
321
    }
322
}
323