Issues (27)

src/Client.php (1 issue)

1
<?php
2
/**
3
 * TCP accelerator [client]
4
 * User: moyo
5
 * Date: 2018/7/30
6
 * Time: 3:10 PM
7
 */
8
9
namespace Carno\HRPC\Accel;
10
11
use function Carno\Coroutine\race;
12
use function Carno\Coroutine\timeout;
13
use Carno\HRPC\Accel\Chips\Protocol;
14
use Carno\HRPC\Accel\Chips\Sequence;
15
use Carno\HRPC\Accel\Chips\Specification;
16
use Carno\HRPC\Accel\Contracts\Named;
17
use Carno\HRPC\Accel\Transport\Packet;
18
use Carno\HRPC\Accel\Transport\Socket;
19
use Carno\HTTP\Exception\RequestTimeoutException;
20
use Carno\HTTP\Options;
21
use Carno\Net\Address;
22
use Carno\Net\Endpoint;
23
use Carno\Pool\Pool;
24
use Carno\Pool\Wrapper\SAR;
25
use Carno\Promise\Promised;
26
use Psr\Http\Message\RequestInterface as Request;
27
use Psr\Http\Message\ResponseInterface as Response;
28
29
class Client
30
{
31
    use SAR;
32
    use Protocol, Sequence, Specification;
33
34
    /**
35
     * @var Address
36
     */
37
    private $address = null;
38
39
    /**
40
     * @var Options
41
     */
42
    private $options = null;
43
44
    /**
45
     * @var Pool
46
     */
47
    private $sockets = null;
48
49
    /**
50
     * Client constructor.
51
     * @param Options $options
52
     * @param Endpoint $endpoint
53
     */
54
    public function __construct(Options $options, Endpoint $endpoint)
55
    {
56
        $this->address = $address = new Address($endpoint->address()->host(), $endpoint->option(Named::VIA_TCP));
57
        $this->options = $options;
58
59
        $this->sockets = new Pool($options->pooling(), static function () use ($address) {
60
            return new Socket($address);
61
        }, $options->identify());
62
    }
63
64
    /**
65
     * @return Address
66
     */
67
    public function restricted() : Address
68
    {
69
        return $this->address;
70
    }
71
72
    /**
73
     * @param Request $request
74
     * @return Response
75
     */
76
    public function perform(Request $request)
77
    {
78
        $seq = $this->seq();
79
80
        /**
81
         * @var Packet $packet
82
         * @see Socket::send
83
         */
84
85
        $packet = yield $this->sarRun(
0 ignored issues
show
Bug Best Practice introduced by
The expression yield $this->sarRun($thi...->request2s($request))) returns the type Generator which is incompatible with the documented return type Psr\Http\Message\ResponseInterface.
Loading history...
86
            $this->sockets,
87
            'send',
88
            [$seq, $this->request2s($request)]
89
        );
90
91
        $message = yield race(
92
            $packet->message(),
93
            timeout(
94
                $this->options->ttWait,
95
                RequestTimeoutException::class,
96
                sprintf('SEQ=%d:WMS=%d:URI=%s', $seq, $this->options->ttWait, $request->getUri()->getPath())
97
            )
98
        );
99
100
        return $this->s2response($message);
101
    }
102
103
    /**
104
     * @return Promised
105
     */
106
    public function close() : Promised
107
    {
108
        return $this->sockets->shutdown();
109
    }
110
}
111