TcpWorker::stop()   A
last analyzed

Complexity

Conditions 3
Paths 4

Size

Total Lines 10

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
nc 4
nop 0
dl 0
loc 10
rs 9.9332
c 0
b 0
f 0
1
<?php
2
3
/*
4
 * This file is part of the slince/spike package.
5
 *
6
 * (c) Slince <[email protected]>
7
 *
8
 * For the full copyright and license information, please view the LICENSE
9
 * file that was distributed with this source code.
10
 */
11
12
namespace Spike\Client\Worker;
13
14
use React\Socket\ConnectionInterface;
15
use React\Socket\Connector;
16
use function Slince\Common\jsonBuffer;
17
use Spike\Client\Client;
18
use Spike\Common\Protocol\Spike;
19
use Spike\Common\Protocol\StreamingJsonParser;
20
use Spike\Common\Tunnel\TcpTunnel;
21
use Spike\Common\Tunnel\TunnelInterface;
22
23
class TcpWorker implements WorkerInterface
24
{
25
    /**
26
     * @var TcpTunnel
27
     */
28
    protected $tunnel;
29
30
    /**
31
     * @var Client
32
     */
33
    protected $client;
34
35
    /**
36
     * @var ConnectionInterface
37
     */
38
    protected $proxyConnection;
39
40
    /**
41
     * @var ConnectionInterface
42
     */
43
    protected $localConnection;
44
45
    /**
46
     * @var string
47
     */
48
    protected $publicConnectionId;
49
50
    /**
51
     * @var string
52
     */
53
    protected $initBuffer;
54
55
    public function __construct(Client $client, TunnelInterface $tunnel, $publicConnectionId)
56
    {
57
        $this->client = $client;
58
        $this->tunnel = $tunnel;
0 ignored issues
show
Documentation Bug introduced by
$tunnel is of type object<Spike\Common\Tunnel\TunnelInterface>, but the property $tunnel was declared to be of type object<Spike\Common\Tunnel\TcpTunnel>. Are you sure that you always receive this specific sub-class here, or does it make sense to add an instanceof check?

Our type inference engine has found a suspicous assignment of a value to a property. This check raises an issue when a value that can be of a given class or a super-class is assigned to a property that is type hinted more strictly.

Either this assignment is in error or an instanceof check should be added for that assignment.

class Alien {}

class Dalek extends Alien {}

class Plot
{
    /** @var  Dalek */
    public $villain;
}

$alien = new Alien();
$plot = new Plot();
if ($alien instanceof Dalek) {
    $plot->villain = $alien;
}
Loading history...
59
        $this->publicConnectionId = $publicConnectionId;
60
    }
61
62
    /**
63
     * @codeCoverageIgnore
64
     */
65
    public function start()
66
    {
67
        $connector = new Connector($this->client->getEventLoop());
68
        $connector->connect($this->client->getConfiguration()->getServerAddress())
69
            ->then([$this, 'handleProxyConnection']);
70
    }
71
72
    /**
73
     * {@inheritdoc}
74
     */
75
    public function resolveTargetHost()
76
    {
77
        return $this->tunnel->getHost();
78
    }
79
80
    /**
81
     * Handles the proxy connection.
82
     *
83
     * @param ConnectionInterface $connection
84
     * @codeCoverageIgnore
85
     */
86
    public function handleProxyConnection(ConnectionInterface $connection)
87
    {
88
        $this->proxyConnection = $connection;
89
        //Register proxy connection
90
        $connection->write(new Spike('register_proxy', $this->tunnel->toArray(), [
91
            'public-connection-id' => $this->publicConnectionId,
92
        ]));
93
        $streamParser = new StreamingJsonParser();
94
        jsonBuffer($connection, function($messages) use ($connection, $streamParser){
95
            if (!$messages) {
96
                return;
97
            }
98
            $message = reset($messages);
99
            $message = Spike::fromArray($message);
100
            if ('start_proxy' === $message->getAction()) {
101
                $this->initBuffer = $streamParser->getRemainingChunk();
102
                $connection->removeAllListeners('data');
103
                $localAddress = $this->resolveTargetHost();
104
                $this->connectLocalHost($localAddress);
105
            }
106
        }, null, $streamParser);
107
    }
108
109
    /**
110
     * Connect the local server.
111
     *
112
     * @param string $address
113
     * @codeCoverageIgnore
114
     */
115
    protected function connectLocalHost($address)
116
    {
117
        $localConnector = new Connector($this->client->getEventLoop());
118
        $localConnector->connect($address)->then([$this, 'handleLocalConnection'],
119
            [$this, 'handleConnectLocalError']
120
        );
121
    }
122
123
    /**
124
     * {@inheritdoc}
125
     *
126
     * @codeCoverageIgnore
127
     */
128
    public function handleLocalConnection(ConnectionInterface $localConnection)
129
    {
130
        $localConnection->pipe($this->proxyConnection);
131
        $this->proxyConnection->pipe($localConnection);
132
        $localConnection->write($this->initBuffer);
133
134
        //Handles the local connection close
135
        $handleLocalConnectionClose = function(){
136
            $this->stop();
137
        };
138
        $localConnection->on('close', $handleLocalConnectionClose);
139
        $localConnection->on('error', $handleLocalConnectionClose);
140
141
        //Handles the proxy connection close
142
        $handleProxyConnectionClose = function(){
143
            $this->stop();
144
        };
145
        $this->proxyConnection->on('close', $handleProxyConnectionClose);
146
        $this->proxyConnection->on('error', $handleProxyConnectionClose);
147
    }
148
149
    /**
150
     * {@inheritdoc}
151
     */
152
    public function handleConnectLocalError(\Exception $exception)
153
    {
154
        $this->proxyConnection->end($exception->getMessage());
155
        $this->stop();
156
    }
157
158
    /**
159
     * {@inheritdoc}
160
     */
161
    public function stop()
162
    {
163
        if ($this->localConnection) {
164
            $this->localConnection->end();
165
        }
166
        if ($this->proxyConnection) {
167
            $this->proxyConnection->end();
168
        }
169
        $this->client->getWorkers()->removeElement($this);
170
    }
171
172
    /**
173
     * {@inheritdoc}
174
     */
175
    public function getTunnel()
176
    {
177
        return $this->tunnel;
178
    }
179
180
    /**
181
     * {@inheritdoc}
182
     */
183
    public static function support(TunnelInterface $tunnel)
184
    {
185
        return $tunnel instanceof TcpTunnel;
186
    }
187
}