Executor::query()   A
last analyzed

Complexity

Conditions 2
Paths 2

Size

Total Lines 10
Code Lines 5

Duplication

Lines 0
Ratio 0 %
Metric Value
dl 0
loc 10
rs 9.4285
cc 2
eloc 5
nc 2
nop 2
1
<?php
2
3
namespace Thruster\Component\Dns;
4
5
use Thruster\Component\Dns\Exception\BadServerException;
6
use Thruster\Component\Dns\Exception\TimeoutException;
7
use Thruster\Component\EventLoop\EventLoopInterface;
8
use Thruster\Component\Promise\Deferred;
9
use Thruster\Component\Promise\PromiseInterface;
10
use Thruster\Component\Socket\Connection;
11
use Thruster\Component\Socket\Exception\ConnectionException;
12
13
/**
14
 * Class Executor
15
 *
16
 * @package Thruster\Component\Dns
17
 * @author  Aurimas Niekis <[email protected]>
18
 */
19
class Executor implements ExecutorInterface
20
{
21
    /**
22
     * @var EventLoopInterface
23
     */
24
    private $loop;
25
26
    /**
27
     * @var Parser
28
     */
29
    private $parser;
30
31
    /**
32
     * @var Dumper
33
     */
34
    private $dumper;
35
36
    /**
37
     * @var int
38
     */
39
    private $timeout;
40
41
    public function __construct(EventLoopInterface $loop, int $timeout = 5)
42
    {
43
        $this->loop    = $loop;
44
        $this->parser  = new Parser();
45
        $this->dumper  = new Dumper();
46
        $this->timeout = $timeout;
47
    }
48
49
    public function query(string $nameServer, Query $query) : PromiseInterface
50
    {
51
        $request = $this->prepareRequest($query);
52
53
        $queryData = $this->dumper->toBinary($request);
54
55
        $transport = strlen($queryData) > 512 ? 'tcp' : 'udp';
56
57
        return $this->doQuery($nameServer, $transport, $queryData, $query->getName());
58
    }
59
60
    private function prepareRequest(Query $query)
61
    {
62
        $request = new Message();
63
64
        $request->header->set('id', $this->generateId());
65
        $request->header->set('rd', 1);
66
        $request->questions[] = $query;
67
        $request->prepare();
68
69
        return $request;
70
    }
71
72
    private function doQuery($nameServer, $transport, $queryData, $name) : PromiseInterface
73
    {
74
        $response = new Message();
75
        $deferred = new Deferred();
76
77
        $retryWithTcp = function () use ($nameServer, $queryData, $name) {
78
            return $this->doQuery($nameServer, 'tcp', $queryData, $name);
79
        };
80
81
        $connection = $this->createConnection($nameServer, $transport);
82
83
        $timer = $this->loop->addTimer($this->timeout, function () use ($connection, $name, $deferred) {
84
            $connection->close();
85
86
            $deferred->reject(new TimeoutException($name));
87
        });
88
89
90
        $connection->on(
91
            'data',
92
            function ($data) use ($retryWithTcp, $connection, &$response, $transport, $deferred, $timer) {
93
                $responseReady = $this->parser->parseChunk($data, $response);
94
95
                if (!$responseReady) {
96
                    return;
97
                }
98
99
                $timer->cancel();
100
101
                if ($response->header->isTruncated()) {
102
                    if ('tcp' === $transport) {
103
                        $deferred->reject(
104
                            new BadServerException(
105
                                'The server set the truncated bit although we issued a TCP request'
106
                            )
107
                        );
108
                    } else {
109
                        $connection->end();
110
                        $deferred->resolve($retryWithTcp());
111
                    }
112
113
                    return;
114
                }
115
116
                $connection->end();
117
                $deferred->resolve($response);
118
            }
119
        );
120
121
        $connection->write($queryData);
122
123
        return $deferred->promise();
124
    }
125
126
    private function generateId() : int
127
    {
128
        return mt_rand(0, 0xffff);
129
    }
130
131
    private function createConnection($nameServer, $transport) : Connection
132
    {
133
        $fd = stream_socket_client(
134
            $transport . '://' . $nameServer,
135
            $errNo,
136
            $errStr,
137
            0,
138
            STREAM_CLIENT_CONNECT | STREAM_CLIENT_ASYNC_CONNECT
139
        );
140
141
        if (false === $fd) {
142
            $message = sprintf(
143
                'Could not bind to %s://%s: %s',
144
                $transport,
145
                $nameServer,
146
                $errStr
147
            );
148
149
            throw new ConnectionException($message, $errNo);
150
        }
151
152
        stream_set_blocking($fd, 0);
153
154
        $connection = new Connection($fd, $this->loop);
155
156
        return $connection;
157
    }
158
}
159