Client::transmit()   B
last analyzed

Complexity

Conditions 10
Paths 7

Size

Total Lines 37
Code Lines 24

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 23
CRAP Score 10.0071

Importance

Changes 0
Metric Value
eloc 24
dl 0
loc 37
ccs 23
cts 24
cp 0.9583
rs 7.6666
c 0
b 0
f 0
cc 10
nc 7
nop 2
crap 10.0071

How to fix   Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
<?php
2
/**
3
 * Sandro Keil (https://sandro-keil.de)
4
 *
5
 * @link      http://github.com/sandrokeil/arangodb-php-client for the canonical source repository
6
 * @copyright Copyright (c) 2018-2020 Sandro Keil
7
 * @license   http://github.com/sandrokeil/arangodb-php-client/blob/master/LICENSE.md New BSD License
8
 */
9
10
declare(strict_types=1);
11
12
namespace ArangoDb\Http;
13
14
use ArangoDb\Exception\ConnectionException;
15
use ArangoDb\Exception\NetworkException;
16
use ArangoDb\Exception\RequestFailedException;
17
use ArangoDb\Exception\TimeoutException;
18
use ArangoDb\Type\BatchType;
19
use ArangoDb\Type\GuardSupport;
20
use ArangoDb\Type\Type;
21
use Fig\Http\Message\StatusCodeInterface;
22
use Psr\Http\Message\RequestFactoryInterface;
23
use Psr\Http\Message\RequestInterface;
24
use Psr\Http\Message\ResponseFactoryInterface;
25
use Psr\Http\Message\ResponseInterface;
26
use Psr\Http\Message\StreamFactoryInterface;
27
28
final class Client implements TypeSupport
29
{
30
    /**
31
     * Chunk size in bytes
32
     */
33
    private const CHUNK_SIZE = 8192;
34
35
    /**
36
     * End of line mark used in HTTP
37
     */
38
    private const EOL = "\r\n";
39
40
    /**
41
     * Connection handle
42
     *
43
     * @var resource
44
     */
45
    private $handle;
46
47
    /**
48
     * @var bool
49
     */
50
    private $useKeepAlive;
51
52
    /**
53
     * @var ClientOptions
54
     */
55
    private $options;
56
57
    /**
58
     * @var string
59
     */
60
    private $baseUrl = '';
61
62
    /**
63
     * Default headers for all requests
64
     *
65
     * @var string
66
     */
67
    private $headerLines = '';
68
69
    /**
70
     * @var string
71
     */
72
    private $database = '';
73
74
    /**
75
     * @var RequestFactoryInterface
76
     */
77
    private $requestFactory;
78
79
    /**
80
     * @var ResponseFactoryInterface
81
     */
82
    private $responseFactory;
83
84
    /**
85
     * @var StreamFactoryInterface
86
     */
87
    private $streamFactory;
88
89
    /**
90
     * @param array|ClientOptions $options
91
     * @param RequestFactoryInterface $requestFactory
92
     * @param ResponseFactoryInterface $responseFactory
93
     * @param StreamFactoryInterface $streamFactory
94
     */
95 51
    public function __construct(
96
        $options,
97
        RequestFactoryInterface $requestFactory,
98
        ResponseFactoryInterface $responseFactory,
99
        StreamFactoryInterface $streamFactory
100
    ) {
101 51
        $this->options = $options instanceof ClientOptions ? $options : new ClientOptions($options);
102 51
        $this->useKeepAlive = ($this->options[ClientOptions::OPTION_CONNECTION] === 'Keep-Alive');
103 51
        $this->requestFactory = $requestFactory;
104 51
        $this->responseFactory = $responseFactory;
105 51
        $this->streamFactory = $streamFactory;
106 51
        $this->updateCommonHttpHeaders();
107 51
    }
108
109
    /**
110
     * Delegate / shorthand method
111
     *
112
     * @param RequestInterface $request
113
     * @return ResponseInterface
114
     * @throws \Psr\Http\Client\ClientExceptionInterface
115
     */
116
    public function __invoke(RequestInterface $request): ResponseInterface
117
    {
118
        return $this->sendRequest($request);
119
    }
120
121 23
    public function sendType(Type $type): ResponseInterface
122
    {
123 23
        $guard = null;
124
125 23
        if ($type instanceof GuardSupport) {
126 16
            $guard = $type->guard();
127
        }
128
129 23
        $response = $this->sendRequest($type->toRequest($this->requestFactory, $this->streamFactory));
130
131 23
        if ($guard !== null) {
132 9
            $guard($response);
133
134 9
            $response->getBody()->rewind();
135
        }
136
137 23
        if ($type instanceof BatchType
138 23
            && null !== ($guards = $type->guards())
139
        ) {
140 1
            BatchResult::fromResponse(
141 1
                $response,
142 1
                $this->responseFactory,
143 1
                $this->streamFactory
144 1
            )->validate(...$guards);
145
146 1
            $response->getBody()->rewind();
147
        }
148
149 23
        return $response;
150
    }
151
152 49
    public function sendRequest(RequestInterface $request): ResponseInterface
153
    {
154
        try {
155 49
            $stream = $request->getBody();
156 49
            $body = $stream->getContents();
157 49
            $method = $request->getMethod();
158
159 49
            $customHeaders = $request->getHeaders();
160 49
            unset($customHeaders['Connection'], $customHeaders['Content-Length']);
161
162 49
            if (! isset($customHeaders['Content-Type'])) {
163 16
                $customHeaders['Content-Type'] = ['application/json'];
164
            }
165
166 49
            $customHeader = '';
167 49
            foreach ($customHeaders as $headerKey => $headerValues) {
168 49
                foreach ($headerValues as $headerValue) {
169 49
                    $customHeader .= $headerKey . ': ' . $headerValue . self::EOL;
170
                }
171
            }
172
        } catch (\Throwable $e) {
173
            throw RequestFailedException::ofRequest($request, $e);
174
        }
175
176 49
        $customHeader .= 'Content-Length: ' . $stream->getSize() . self::EOL;
177
178 49
        $url = $this->baseUrl . $request->getUri();
179
180
        try {
181 49
            $this->open($request);
182
183 49
            $result = $this->transmit(
184 49
                $method . ' ' . $url . ' HTTP/1.1' .
185 49
                $this->headerLines .
186 49
                $customHeader . self::EOL .
187 49
                $body,
188
                $method
189
            );
190
            // TODO https://docs.arangodb.com/3.4/Manual/Architecture/DeploymentModes/ActiveFailover/Architecture.html
191 49
            $status = stream_get_meta_data($this->handle);
192
193 49
            if (true === $status['timed_out']) {
194
                throw TimeoutException::ofRequest($request);
195
            }
196 49
            if (! $this->useKeepAlive) {
197
                $this->close();
198
            }
199
200 49
            [$httpCode, $headers, $body] = HttpHelper::parseMessage($result);
201
        } catch (\Throwable $e) {
202
            throw NetworkException::with($request, $e);
203
        }
204 49
        $response = $this->responseFactory->createResponse($httpCode);
205
206 49
        foreach ($headers as $headerName => $header) {
207 49
            $response = $response->withAddedHeader($headerName, $header);
208
        }
209
210 49
        return $response->withBody($this->streamFactory->createStream($body));
211
    }
212
213
    /**
214
     * Sends request to server and reads response.
215
     *
216
     * @param string $request
217
     * @param string $method
218
     * @return string
219
     */
220 49
    private function transmit(string $request, string $method): string
221
    {
222 49
        fwrite($this->handle, $request);
223 49
        fflush($this->handle);
224
225 49
        $contentLength = 0;
226 49
        $bodyLength = 0;
227 49
        $readTotal = 0;
228 49
        $matches = [];
229 49
        $message = '';
230
231
        do {
232 49
            $read = fread($this->handle, self::CHUNK_SIZE);
233 49
            if (false === $read || $read === '') {
234
                break;
235
            }
236 49
            $readLength = strlen($read);
237 49
            $readTotal += $readLength;
238 49
            $message .= $read;
239
240 49
            if ($contentLength === 0
241 49
                && $method !== 'HEAD'
242 49
                && 1 === preg_match('/content-length: (\d+)/i', $message, $matches)
243
            ) {
244 49
                $contentLength = (int)$matches[1];
245
            }
246
247 49
            if ($bodyLength === 0) {
248 49
                $bodyStart = strpos($message, "\r\n\r\n");
249
250 49
                if (false !== $bodyStart) {
251 49
                    $bodyLength = $bodyStart + $contentLength + 4;
252
                }
253
            }
254 49
        } while ($readTotal < $bodyLength && ! feof($this->handle));
255
256 49
        return $message;
257
    }
258
259
    /**
260
     * Update common HTTP headers for all HTTP requests
261
     */
262 51
    private function updateCommonHttpHeaders(): void
263
    {
264 51
        $this->headerLines = self::EOL;
265
266 51
        $endpoint = $this->options[ClientOptions::OPTION_ENDPOINT];
267 51
        if (1 !== preg_match('/^unix:\/\/.+/', $endpoint)) {
268
            $this->headerLines .= 'Host: '
269 51
                . preg_replace('/^(tcp|ssl):\/\/(.+?):(\d+)\/?$/', '\\2', $endpoint)
270 51
                . self::EOL;
271
        }
272
        // add basic auth header
273
        if (isset(
274 51
            $this->options[ClientOptions::OPTION_AUTH_TYPE],
275 51
            $this->options[ClientOptions::OPTION_AUTH_USER]
276
        )) {
277
            $this->headerLines .= sprintf(
278
                'Authorization: %s %s%s',
279
                $this->options[ClientOptions::OPTION_AUTH_TYPE],
280
                base64_encode(
281
                    $this->options[ClientOptions::OPTION_AUTH_USER] . ':' .
282
                    $this->options[ClientOptions::OPTION_AUTH_PASSWD]
283
                ),
284
                self::EOL
285
            );
286
        }
287
288 51
        if (isset($this->options[ClientOptions::OPTION_CONNECTION])) {
289 51
            $this->headerLines .= 'Connection: ' . $this->options[ClientOptions::OPTION_CONNECTION] . self::EOL;
290
        }
291
292 51
        $this->database = $this->options[ClientOptions::OPTION_DATABASE];
293 51
        $this->baseUrl = '/_db/' . urlencode($this->database);
294 51
    }
295
296
    /**
297
     * Opens connection depending on options.
298
     *
299
     * @param RequestInterface $request
300
     */
301 49
    private function open(RequestInterface $request): void
302
    {
303 49
        if ($this->useKeepAlive && $this->handle !== null && is_resource($this->handle)) { // @phpstan-ignore-line
304 27
            if (! feof($this->handle)) {
305 27
                return;
306
            }
307
308
            $this->close();
309
310
            if (false === $this->options[ClientOptions::OPTION_RECONNECT]) {
311
                throw ConnectionException::forRequest(
312
                    $request,
313
                    'Server has closed the connection already.',
314
                    StatusCodeInterface::STATUS_REQUEST_TIMEOUT
315
                );
316
            }
317
        }
318
319 49
        $endpoint = $this->options[ClientOptions::OPTION_ENDPOINT];
320 49
        $context = stream_context_create();
321
322 49
        if (1 === preg_match('/^ssl:\/\/.+/', $endpoint)) {
323
            stream_context_set_option(
324
                $context,
325
                [
326
                    'ssl' => [
327
                        'verify_peer' => $this->options[ClientOptions::OPTION_VERIFY_CERT],
328
                        'verify_peer_name' => $this->options[ClientOptions::OPTION_VERIFY_CERT_NAME],
329
                        'allow_self_signed' => $this->options[ClientOptions::OPTION_ALLOW_SELF_SIGNED],
330
                        'ciphers' => $this->options[ClientOptions::OPTION_CIPHERS],
331
                    ],
332
                ]
333
            );
334
        }
335
336 49
        $handle = stream_socket_client(
337 49
            $endpoint,
338
            $errNo,
339
            $message,
340 49
            $this->options[ClientOptions::OPTION_TIMEOUT],
341 49
            STREAM_CLIENT_CONNECT,
342
            $context
343
        );
344
345 49
        if (false === $handle) {
346
            throw ConnectionException::forRequest(
347
                $request,
348
                sprintf('Cannot connect to endpoint "%s". Message: %s', $endpoint, $message),
349
                $errNo
350
            );
351
        }
352 49
        $this->handle = $handle;
353 49
        stream_set_timeout($this->handle, $this->options[ClientOptions::OPTION_TIMEOUT]);
354 49
    }
355
356
    /**
357
     * Closes connection
358
     */
359
    private function close(): void
360
    {
361
        fclose($this->handle);
362
        $this->handle = null; // @phpstan-ignore-line
363
    }
364
}
365