Socket::close()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 4
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 2
dl 0
loc 4
rs 10
c 0
b 0
f 0
cc 1
nc 1
nop 0
1
<?php
2
/**
3
 * Transport socket
4
 * User: moyo
5
 * Date: 2018/8/1
6
 * Time: 5:01 PM
7
 */
8
9
namespace Carno\HRPC\Accel\Transport;
10
11
use Carno\HRPC\Accel\Chips\Protocol;
12
use Carno\HRPC\Accel\Contracts\Config;
13
use Carno\HRPC\Accel\Exception\TransportNowClosedException;
14
use Carno\Net\Address;
15
use Carno\Net\Events;
16
use Carno\Pool\Managed;
17
use Carno\Pool\Poolable;
18
use Carno\Promise\Promise;
19
use Carno\Promise\Promised;
20
use Carno\Socket\Contracts\Stream;
21
use Carno\Socket\Contracts\TClient;
22
use Carno\Socket\Options;
23
use Carno\Socket\Socket as Sock;
24
25
class Socket implements Poolable
26
{
27
    use Managed, Protocol;
28
29
    /**
30
     * @var Address
31
     */
32
    private $connect = null;
33
34
    /**
35
     * @var Promised
36
     */
37
    private $connected = null;
38
39
    /**
40
     * @var Reconciled
41
     */
42
    private $reconciled = null;
43
44
    /**
45
     * @var TClient
46
     */
47
    private $socket = null;
48
49
    /**
50
     * TCP constructor.
51
     * @param Address $connect
52
     */
53
    public function __construct(Address $connect)
54
    {
55
        $this->connect = $connect;
56
        $this->connected = Promise::deferred();
57
58
        $this->reconciled = new Reconciled;
59
60
        $this->closed()->sync($this->cleanup());
61
    }
62
63
    /**
64
     * @return Promised
65
     */
66
    public function connect() : Promised
67
    {
68
        $this->socket = Sock::connect(
69
            $this->connect,
70
            (new Events)
71
            ->attach(Events\Socket::CONNECTED, function () {
72
                $this->connected->resolve();
73
            })
74
            ->attach(Events\Socket::RECEIVED, function (Stream $conn) {
75
                $this->reconciled->done(...$this->unpacking($conn->recv()));
0 ignored issues
show
Bug introduced by
It seems like $this->unpacking($conn->recv()) can also be of type string; however, parameter $seq of Carno\HRPC\Accel\Transport\Reconciled::done() does only seem to accept integer, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

75
                $this->reconciled->done(/** @scrutinizer ignore-type */ ...$this->unpacking($conn->recv()));
Loading history...
76
            })
77
            ->attach(Events\Socket::CLOSED, function () {
78
                $this->closed()->resolve();
79
            })
80
            ->attach(Events\Socket::ERROR, function () {
81
                $this->closed()->resolve();
82
            }),
83
            new Options(Config::SW_PACKAGE, Config::SW_SOCKET)
84
        );
85
86
        return $this->connected;
87
    }
88
89
    /**
90
     * @return Promised
91
     */
92
    public function heartbeat() : Promised
93
    {
94
        return Promise::resolved();
95
    }
96
97
    /**
98
     * @return Promised
99
     */
100
    public function close() : Promised
101
    {
102
        $this->socket->close();
103
        return $this->closed();
104
    }
105
106
    /**
107
     * @return Promised
108
     */
109
    private function cleanup() : Promised
110
    {
111
        ($w = Promise::deferred())->then(function () {
112
            $this->reconciled->shutdown(new TransportNowClosedException);
113
        });
114
        return $w;
115
    }
116
117
    /**
118
     * @param int $seq
119
     * @param string $message
120
     * @return Packet
121
     */
122
    public function send(int $seq, string $message) : Packet
123
    {
124
        $this->socket->write($this->packing($seq, $message));
125
        return $this->reconciled->wait($seq);
126
    }
127
}
128