TcpConnector::waitForStreamOnce()   A
last analyzed

Complexity

Conditions 1
Paths 1

Size

Total Lines 12
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 7
CRAP Score 1
Metric Value
dl 0
loc 12
ccs 7
cts 7
cp 1
rs 9.4285
cc 1
eloc 6
nc 1
nop 1
crap 1
1
<?php
2
3
namespace Thruster\Component\SocketClient;
4
5
use Thruster\Component\EventLoop\EventLoopInterface;
6
use Thruster\Component\Promise\Deferred;
7
use Thruster\Component\Promise\FulfilledPromise;
8
use Thruster\Component\Promise\PromiseInterface;
9
use Thruster\Component\Promise\RejectedPromise;
10
use Thruster\Component\SocketClient\Exception\ConnectionException;
11
use Thruster\Component\Stream\Stream;
12
13
/**
14
 * Class TcpConnector
15
 *
16
 * @package Thruster\Component\SocketClient
17
 * @author  Aurimas Niekis <[email protected]>
18
 */
19
class TcpConnector implements ConnectorInterface
20
{
21
    /**
22
     * @var EventLoopInterface
23
     */
24
    private $loop;
25
26 8
    public function __construct(EventLoopInterface $loop)
27
    {
28 8
        $this->loop = $loop;
29 8
    }
30
31 8
    public function create(string $ip, int $port) : PromiseInterface
32
    {
33 8
        if (false === filter_var($ip, FILTER_VALIDATE_IP)) {
34 1
            return new RejectedPromise(
35 1
                new \InvalidArgumentException(
36
                    sprintf(
37 1
                        'Given parameter "%s" is not a valid IP',
38
                        $ip
39
                    )
40
                )
41
            );
42
        }
43
44 7
        $url = $this->getSocketUrl($ip, $port);
45
46 7
        $socket = @stream_socket_client($url, $errNo, $errStr, 0, STREAM_CLIENT_CONNECT | STREAM_CLIENT_ASYNC_CONNECT);
47
48 7
        if (false === $socket) {
49 1
            return new RejectedPromise(
50 1
                new \RuntimeException(
51 1
                    sprintf("Connection to %s:%d failed: %s", $ip, $port, $errStr),
52
                    $errNo
53
                )
54
            );
55
        }
56
57 6
        stream_set_blocking($socket, 0);
58
59
        // wait for connection
60
61
        return $this
62 6
            ->waitForStreamOnce($socket)
63 6
            ->then([$this, 'checkConnectedSocket'])
64 6
            ->then([$this, 'handleConnectedSocket']);
65
    }
66
67 6
    private function waitForStreamOnce($stream) : PromiseInterface
68
    {
69 6
        $deferred = new Deferred();
70
71 6
        $this->loop->addWriteStream($stream, function ($stream) use ($deferred) {
72 6
            $this->loop->removeWriteStream($stream);
73
74 6
            $deferred->resolve($stream);
75 6
        });
76
77 6
        return $deferred->promise();
78
    }
79
80 6
    public function checkConnectedSocket($socket) : PromiseInterface
81
    {
82
        // The following hack looks like the only way to
83
        // detect connection refused errors with PHP's stream sockets.
84 6
        if (false === stream_socket_get_name($socket, true)) {
85 2
            return new RejectedPromise(
86 2
                new ConnectionException('Connection refused')
87
            );
88
        }
89
90 4
        return new FulfilledPromise($socket);
91
    }
92
93 4
    public function handleConnectedSocket($socket) : Stream
94
    {
95 4
        return new Stream($socket, $this->loop);
96
    }
97
98 7
    private function getSocketUrl(string $ip, int $port) : string
99
    {
100 7
        if (false !== strpos($ip, ':')) {
101
            // enclose IPv6 addresses in square brackets before appending port
102 2
            $ip = '[' . $ip . ']';
103
        }
104
105 7
        return 'tcp://' . $ip . ':' . $port;
106
    }
107
}
108