Issues (32)

src/Client.php (1 issue)

1
<?php
2
/**
3
 * HTTP Client
4
 * User: moyo
5
 * Date: 23/08/2017
6
 * Time: 10:52 PM
7
 */
8
9
namespace Carno\HTTP;
10
11
use function Carno\Coroutine\async;
12
use Carno\DNS\DNS;
13
use Carno\DNS\Result;
14
use Carno\HTTP\Client\Methods;
15
use Carno\HTTP\Contracts\Client as API;
16
use Carno\HTTP\Exception\ClientBindingException;
17
use Carno\HTTP\Powered\Swoole\Client as SWClient;
18
use Carno\Net\Address;
19
use Carno\Pool\Pool;
20
use Carno\Pool\Wrapper\SAR;
21
use Carno\Promise\Promise;
22
use Carno\Promise\Promised;
23
use Psr\Http\Message\RequestInterface as Request;
24
use Psr\Http\Message\UriInterface as Uri;
25
use Closure;
26
27
class Client implements API
28
{
29
    use SAR, Methods;
30
31
    /**
32
     * @var Options
33
     */
34
    private $options = null;
35
36
    /**
37
     * @var SWClient
38
     */
39
    private $session = null;
40
41
    /**
42
     * @var Address
43
     */
44
    private $restrict = null;
45
46
    /**
47
     * @var Promised
48
     */
49
    private $closed = null;
50
51
    /**
52
     * @var Pool
53
     */
54
    private $pool = null;
55
56
    /**
57
     * @var Result[]
58
     */
59
    private $dns = [];
60
61
    /**
62
     * @var Promised
63
     */
64
    private $ka = null;
65
66
    /**
67
     * Client constructor.
68
     * @param Options $options
69
     * @param Address $limited
70
     */
71
    public function __construct(Options $options, Address $limited = null)
72
    {
73
        $this->options = $options;
74
        $this->restrict = $limited;
75
    }
76
77
    /**
78
     * @return Address|null
79
     */
80
    public function restricted() : ?Address
81
    {
82
        return $this->restrict;
83
    }
84
85
    /**
86
     * @param Request $request
87
     * @param Promised|null $canceller
88
     * @return Promised
89
     */
90
    public function perform(Request $request, Promised $canceller = null) : Promised
91
    {
92
        if ($this->options->pooled()) {
93
            return async(function () use ($request, $canceller) {
94
                if ($canceller) {
95
                    $interrupt = Promise::deferred();
96
                    $canceller->then(static function () use ($interrupt) {
97
                        $interrupt->then(static function (SWClient $client) {
98
                            return $client->close();
99
                        });
100
                        $interrupt->resolve();
101
                    });
102
                }
103
104
                return yield $this->sarRun(
105
                    $this->pool($this->remote($request->getUri())),
106
                    'execute',
107
                    [$request, $this->options],
108
                    $interrupt ?? null
109
                );
110
            });
111
        }
112
113
        return async(function () use ($request, $canceller) {
114
            /**
115
             * @var SWClient $client
116
             */
117
            $this->session = $client = yield $this->http($this->remote($request->getUri()), false);
118
119
            $canceller && $canceller->then(static function () use ($client) {
120
                return $client->close();
121
            });
122
123
            return yield $client->execute($request, $this->options);
124
        });
125
    }
126
127
    /**
128
     * close pool connections or session client
129
     * @return Promised
130
     */
131
    public function close() : Promised
132
    {
133
        return
134
            ($this->pool
135
                ? $this->pool->shutdown()
136
                : (
137
                    $this->session
138
                        ? $this->session->close()
139
                        : Promise::resolved()
140
                )
141
            )->sync($this->closed())
142
        ;
143
    }
144
145
    /**
146
     * @return Promised
147
     */
148
    public function closed() : Promised
149
    {
150
        return $this->closed ?? $this->closed = Promise::deferred();
151
    }
152
153
    /**
154
     * @param Closure $initialize
155
     */
156
    public function keepalived(Closure $initialize) : void
157
    {
158
        ($this->ka ?? $this->ka = Promise::deferred())->then(function () use ($initialize) {
159
            $initialize($this->pool);
160
        });
161
    }
162
163
    /**
164
     * @param Address $remote
165
     * @return Pool
166
     */
167
    private function pool(Address $remote) : Pool
168
    {
169
        if ($this->pool) {
170
            return $this->pool;
171
        }
172
173
        $this->pool = new Pool($this->options->pooling(), function () use ($remote) {
174
            return $this->http($remote, true);
175
        }, $this->options->identify());
176
177
        $this->pool->closed()->then(function () {
178
            $this->pool = null;
179
        });
180
181
        $this->ka && $this->ka->resolve();
182
183
        return $this->pool;
184
    }
185
186
    /**
187
     * @param Address $remote
188
     * @param bool $pooled
189
     * @return SWClient
190
     */
191
    private function http(Address $remote, bool $pooled)
192
    {
193
        $host = $remote->host();
194
        $port = $remote->port();
195
196
        if ($this->restrict && ($this->restrict->host() !== $host || $this->restrict->port() !== $port)) {
197
            throw new ClientBindingException(
198
                sprintf(
199
                    'Expected "%s:%d" got "%s:%d"',
200
                    $this->restrict->host(),
201
                    $this->restrict->port(),
202
                    $host,
203
                    $port
204
                )
205
            );
206
        }
207
208
        $this->restrict = $remote;
209
210
        $dns = $this->dns[$host] ?? $this->dns[$host] = yield DNS::resolve($host, $this->options->ttLookup);
0 ignored issues
show
Bug Best Practice introduced by
The expression yield Carno\DNS\DNS::res...his->options->ttLookup) returns the type Generator which is incompatible with the documented return type Carno\HTTP\Powered\Swoole\Client.
Loading history...
211
212
        return new SWClient(new Address($dns->random(), $port), $pooled);
213
    }
214
215
    /**
216
     * @param Uri $uri
217
     * @return Address
218
     */
219
    private function remote(Uri $uri) : Address
220
    {
221
        return $this->restrict ?? new Address($uri->getHost(), $uri->getPort());
222
    }
223
}
224