carno-php /
http
| 1 | <?php |
||
| 2 | /** |
||
| 3 | * HTTP Client |
||
| 4 | * User: moyo |
||
| 5 | * Date: 23/08/2017 |
||
| 6 | * Time: 10:52 PM |
||
| 7 | */ |
||
| 8 | |||
| 9 | namespace Carno\HTTP; |
||
| 10 | |||
| 11 | use function Carno\Coroutine\async; |
||
| 12 | use Carno\DNS\DNS; |
||
| 13 | use Carno\DNS\Result; |
||
| 14 | use Carno\HTTP\Client\Methods; |
||
| 15 | use Carno\HTTP\Contracts\Client as API; |
||
| 16 | use Carno\HTTP\Exception\ClientBindingException; |
||
| 17 | use Carno\HTTP\Powered\Swoole\Client as SWClient; |
||
| 18 | use Carno\Net\Address; |
||
| 19 | use Carno\Pool\Pool; |
||
| 20 | use Carno\Pool\Wrapper\SAR; |
||
| 21 | use Carno\Promise\Promise; |
||
| 22 | use Carno\Promise\Promised; |
||
| 23 | use Psr\Http\Message\RequestInterface as Request; |
||
| 24 | use Psr\Http\Message\UriInterface as Uri; |
||
| 25 | use Closure; |
||
| 26 | |||
| 27 | class Client implements API |
||
| 28 | { |
||
| 29 | use SAR, Methods; |
||
| 30 | |||
| 31 | /** |
||
| 32 | * @var Options |
||
| 33 | */ |
||
| 34 | private $options = null; |
||
| 35 | |||
| 36 | /** |
||
| 37 | * @var SWClient |
||
| 38 | */ |
||
| 39 | private $session = null; |
||
| 40 | |||
| 41 | /** |
||
| 42 | * @var Address |
||
| 43 | */ |
||
| 44 | private $restrict = null; |
||
| 45 | |||
| 46 | /** |
||
| 47 | * @var Promised |
||
| 48 | */ |
||
| 49 | private $closed = null; |
||
| 50 | |||
| 51 | /** |
||
| 52 | * @var Pool |
||
| 53 | */ |
||
| 54 | private $pool = null; |
||
| 55 | |||
| 56 | /** |
||
| 57 | * @var Result[] |
||
| 58 | */ |
||
| 59 | private $dns = []; |
||
| 60 | |||
| 61 | /** |
||
| 62 | * @var Promised |
||
| 63 | */ |
||
| 64 | private $ka = null; |
||
| 65 | |||
| 66 | /** |
||
| 67 | * Client constructor. |
||
| 68 | * @param Options $options |
||
| 69 | * @param Address $limited |
||
| 70 | */ |
||
| 71 | public function __construct(Options $options, Address $limited = null) |
||
| 72 | { |
||
| 73 | $this->options = $options; |
||
| 74 | $this->restrict = $limited; |
||
| 75 | } |
||
| 76 | |||
| 77 | /** |
||
| 78 | * @return Address|null |
||
| 79 | */ |
||
| 80 | public function restricted() : ?Address |
||
| 81 | { |
||
| 82 | return $this->restrict; |
||
| 83 | } |
||
| 84 | |||
| 85 | /** |
||
| 86 | * @param Request $request |
||
| 87 | * @param Promised|null $canceller |
||
| 88 | * @return Promised |
||
| 89 | */ |
||
| 90 | public function perform(Request $request, Promised $canceller = null) : Promised |
||
| 91 | { |
||
| 92 | if ($this->options->pooled()) { |
||
| 93 | return async(function () use ($request, $canceller) { |
||
| 94 | if ($canceller) { |
||
| 95 | $interrupt = Promise::deferred(); |
||
| 96 | $canceller->then(static function () use ($interrupt) { |
||
| 97 | $interrupt->then(static function (SWClient $client) { |
||
| 98 | return $client->close(); |
||
| 99 | }); |
||
| 100 | $interrupt->resolve(); |
||
| 101 | }); |
||
| 102 | } |
||
| 103 | |||
| 104 | return yield $this->sarRun( |
||
| 105 | $this->pool($this->remote($request->getUri())), |
||
| 106 | 'execute', |
||
| 107 | [$request, $this->options], |
||
| 108 | $interrupt ?? null |
||
| 109 | ); |
||
| 110 | }); |
||
| 111 | } |
||
| 112 | |||
| 113 | return async(function () use ($request, $canceller) { |
||
| 114 | /** |
||
| 115 | * @var SWClient $client |
||
| 116 | */ |
||
| 117 | $this->session = $client = yield $this->http($this->remote($request->getUri()), false); |
||
| 118 | |||
| 119 | $canceller && $canceller->then(static function () use ($client) { |
||
| 120 | return $client->close(); |
||
| 121 | }); |
||
| 122 | |||
| 123 | return yield $client->execute($request, $this->options); |
||
| 124 | }); |
||
| 125 | } |
||
| 126 | |||
| 127 | /** |
||
| 128 | * close pool connections or session client |
||
| 129 | * @return Promised |
||
| 130 | */ |
||
| 131 | public function close() : Promised |
||
| 132 | { |
||
| 133 | return |
||
| 134 | ($this->pool |
||
| 135 | ? $this->pool->shutdown() |
||
| 136 | : ( |
||
| 137 | $this->session |
||
| 138 | ? $this->session->close() |
||
| 139 | : Promise::resolved() |
||
| 140 | ) |
||
| 141 | )->sync($this->closed()) |
||
| 142 | ; |
||
| 143 | } |
||
| 144 | |||
| 145 | /** |
||
| 146 | * @return Promised |
||
| 147 | */ |
||
| 148 | public function closed() : Promised |
||
| 149 | { |
||
| 150 | return $this->closed ?? $this->closed = Promise::deferred(); |
||
| 151 | } |
||
| 152 | |||
| 153 | /** |
||
| 154 | * @param Closure $initialize |
||
| 155 | */ |
||
| 156 | public function keepalived(Closure $initialize) : void |
||
| 157 | { |
||
| 158 | ($this->ka ?? $this->ka = Promise::deferred())->then(function () use ($initialize) { |
||
| 159 | $initialize($this->pool); |
||
| 160 | }); |
||
| 161 | } |
||
| 162 | |||
| 163 | /** |
||
| 164 | * @param Address $remote |
||
| 165 | * @return Pool |
||
| 166 | */ |
||
| 167 | private function pool(Address $remote) : Pool |
||
| 168 | { |
||
| 169 | if ($this->pool) { |
||
| 170 | return $this->pool; |
||
| 171 | } |
||
| 172 | |||
| 173 | $this->pool = new Pool($this->options->pooling(), function () use ($remote) { |
||
| 174 | return $this->http($remote, true); |
||
| 175 | }, $this->options->identify()); |
||
| 176 | |||
| 177 | $this->pool->closed()->then(function () { |
||
| 178 | $this->pool = null; |
||
| 179 | }); |
||
| 180 | |||
| 181 | $this->ka && $this->ka->resolve(); |
||
| 182 | |||
| 183 | return $this->pool; |
||
| 184 | } |
||
| 185 | |||
| 186 | /** |
||
| 187 | * @param Address $remote |
||
| 188 | * @param bool $pooled |
||
| 189 | * @return SWClient |
||
| 190 | */ |
||
| 191 | private function http(Address $remote, bool $pooled) |
||
| 192 | { |
||
| 193 | $host = $remote->host(); |
||
| 194 | $port = $remote->port(); |
||
| 195 | |||
| 196 | if ($this->restrict && ($this->restrict->host() !== $host || $this->restrict->port() !== $port)) { |
||
| 197 | throw new ClientBindingException( |
||
| 198 | sprintf( |
||
| 199 | 'Expected "%s:%d" got "%s:%d"', |
||
| 200 | $this->restrict->host(), |
||
| 201 | $this->restrict->port(), |
||
| 202 | $host, |
||
| 203 | $port |
||
| 204 | ) |
||
| 205 | ); |
||
| 206 | } |
||
| 207 | |||
| 208 | $this->restrict = $remote; |
||
| 209 | |||
| 210 | $dns = $this->dns[$host] ?? $this->dns[$host] = yield DNS::resolve($host, $this->options->ttLookup); |
||
|
0 ignored issues
–
show
Bug
Best Practice
introduced
by
Loading history...
|
|||
| 211 | |||
| 212 | return new SWClient(new Address($dns->random(), $port), $pooled); |
||
| 213 | } |
||
| 214 | |||
| 215 | /** |
||
| 216 | * @param Uri $uri |
||
| 217 | * @return Address |
||
| 218 | */ |
||
| 219 | private function remote(Uri $uri) : Address |
||
| 220 | { |
||
| 221 | return $this->restrict ?? new Address($uri->getHost(), $uri->getPort()); |
||
| 222 | } |
||
| 223 | } |
||
| 224 |