Completed
Pull Request — master (#85)
by thomas
03:18
created

Manager::connectToPeers()   A

Complexity

Conditions 2
Paths 2

Size

Total Lines 9
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 6
CRAP Score 2

Importance

Changes 0
Metric Value
c 0
b 0
f 0
dl 0
loc 9
ccs 6
cts 6
cp 1
rs 9.6666
cc 2
eloc 5
nc 2
nop 2
crap 2
1
<?php
2
3
namespace BitWasp\Bitcoin\Networking\Peer;
4
5
use BitWasp\Bitcoin\Networking\Settings\NetworkSettings;
6
use BitWasp\Bitcoin\Networking\Structure\NetworkAddressInterface;
7
use Evenement\EventEmitter;
8
use React\Promise\Deferred;
9
use React\Promise\RejectedPromise;
10
use React\Promise\Timer\TimeoutException;
11
12
class Manager extends EventEmitter
13
{
14
    /**
15
     * @var Connector
16
     */
17
    private $connector;
18
19
    /**
20
     * @var Peer[]
21
     */
22
    private $outPeers = [];
23
24
    /**
25
     * @var Peer[]
26
     */
27
    private $inPeers = [];
28
29
    /**
30
     * @var int
31
     */
32
    private $nOutPeers = 0;
33
34
    /**
35
     * @var int
36
     */
37
    private $nInPeers = 0;
38
39
    /**
40
     * @var NetworkSettings
41
     */
42
    private $settings;
43
44
    /**
45
     * Manager constructor.
46
     * @param Connector $connector
47
     * @param NetworkSettings $settings
48
     */
49 6
    public function __construct(Connector $connector, NetworkSettings $settings)
50
    {
51 6
        $this->connector = $connector;
52 6
        $this->settings = $settings;
53 6
    }
54
55
    /**
56
     * Store the newly connected peer, and trigger a new connection if they go away.
57
     *
58
     * @param Peer $peer
59
     * @return Peer
60
     */
61 2
    public function registerOutboundPeer(Peer $peer)
62
    {
63 2
        $next = $this->nOutPeers++;
64
        $peer->on('close', function ($peer) use ($next) {
65 2
            $this->emit('disconnect', [$peer]);
66 2
            unset($this->outPeers[$next]);
67 2
        });
68
69 2
        $this->outPeers[$next] = $peer;
70 2
        $this->emit('outbound', [$peer]);
71 2
        return $peer;
72
    }
73
74
    /**
75
     * @param Peer $peer
76
     */
77 3
    public function registerInboundPeer(Peer $peer)
78
    {
79 3
        $next = $this->nInPeers++;
80 3
        $this->inPeers[$next] = $peer;
81
        $peer->on('close', function () use ($next) {
82
            unset($this->inPeers[$next]);
83 3
        });
84 3
        $this->emit('inbound', [$peer]);
85 3
    }
86
87
    /**
88
     * @param Listener $listener
89
     * @return $this
90
     */
91 3
    public function registerListener(Listener $listener)
92
    {
93
        $listener->on('connection', function (Peer $peer) {
94 3
            $this->registerInboundPeer($peer);
95 3
        });
96
97 3
        return $this;
98
    }
99
100
    /**
101
     * @param NetworkAddressInterface $address
102
     * @return \React\Promise\PromiseInterface
103
     * @throws \Exception
104
     */
105 3
    public function connect(NetworkAddressInterface $address)
106
    {
107 3
        return $this->connector->connect($address);
108
    }
109
110
    /**
111
     * @param Locator $locator
112
     * @return \React\Promise\Promise|\React\Promise\PromiseInterface
113
     */
114 3
    public function getAnotherPeer(Locator $locator)
115
    {
116 3
        $deferred = new Deferred();
117
118
        // Otherwise, rely on the Locator.
119
        try {
120 3
            $deferred->resolve($locator->popAddress());
121 1
        } catch (\Exception $e) {
122
            $locator->queryDnsSeeds()->then(
123
                function () use ($deferred, $locator) {
124
                    $deferred->resolve($locator->popAddress());
125
                },
126
                function ($error) use ($deferred) {
127
                    return new RejectedPromise($error);
128
                }
129
            );
130
        }
131
132 3
        return $deferred->promise();
133
    }
134
135
    /**
136
     * @param Locator $locator
137
     * @return \React\Promise\Promise|\React\Promise\PromiseInterface
138
     */
139 3
    public function attemptNextPeer(Locator $locator)
140
    {
141 3
        $attempt = new Deferred();
142
143 1
        $this
144 3
            ->getAnotherPeer($locator)
145
            ->then(function (NetworkAddressInterface $address) use ($attempt, $locator) {
146 3
                return $this->connect($address)->then(
147
                    function (Peer $peer) use ($attempt) {
148 2
                        $this->registerOutboundPeer($peer);
149 2
                        $attempt->resolve($peer);
150 2
                        return $peer;
151 3
                    },
152
                    function (\Exception $error) use ($attempt) {
153 2
                        $attempt->reject($error);
154 3
                    }
155 1
                );
156
            }, function ($error) use ($attempt) {
157
                $attempt->reject($error);
158 3
            });
159
160 3
        return $attempt->promise();
161
    }
162
163
    /**
164
     * @param Locator $locator
165
     * @param int $retries
166
     * @return \React\Promise\PromiseInterface
167
     */
168 3
    public function connectNextPeer(Locator $locator, $retries = null)
169
    {
170 3
        if ($retries === null) {
171 3
            $retries = $this->settings->getMaxConnectRetries();
172 1
        }
173
174 3
        if (!(is_integer($retries) && $retries >= 0)) {
175
            throw new \InvalidArgumentException("Invalid retry count, must be an integer greater than zero");
176
        }
177
178 3
        $errorBack = function ($error) use ($locator, $retries) {
179 2
            $allowContinue = false;
180 2
            if ($error instanceof \RuntimeException && $error->getMessage() === "Connection refused") {
181
                $allowContinue = true;
182
            }
183
184 2
            if ($error instanceof TimeoutException) {
185 2
                $allowContinue = true;
186 1
            }
187
188 2
            if (!$allowContinue) {
189
                throw $error;
190
            }
191
192 2
            if (0 >= $retries) {
193 1
                throw new \RuntimeException("Connection to peers failed: too many attempts");
194
            }
195
196 2
            return $this->connectNextPeer($locator, $retries - 1);
197 3
        };
198
199 1
        return $this
200 3
            ->attemptNextPeer($locator)
201 3
            ->then(null, $errorBack);
202
    }
203
204
    /**
205
     * @param Locator $locator
206
     * @param int $n
207
     * @return \React\Promise\Promise
208
     */
209 3
    public function connectToPeers(Locator $locator, $n)
210
    {
211 3
        $peers = [];
212 3
        for ($i = 0; $i < $n; $i++) {
213 3
            $peers[$i] = $this->connectNextPeer($locator);
214 1
        }
215
216 3
        return \React\Promise\all($peers);
217
    }
218
}
219