Completed
Push — master ( 4d00a0...e4a998 )
by thomas
98:35 queued 58:37
created

Manager::connectNextPeer()   D

Complexity

Conditions 9
Paths 4

Size

Total Lines 37
Code Lines 20

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 90

Importance

Changes 1
Bugs 0 Features 1
Metric Value
dl 0
loc 37
ccs 0
cts 0
cp 0
rs 4.909
c 1
b 0
f 1
cc 9
eloc 20
nc 4
nop 2
crap 90
1
<?php
2
3
namespace BitWasp\Bitcoin\Networking\Peer;
4
5
use BitWasp\Bitcoin\Networking\Settings\NetworkSettings;
6
use BitWasp\Bitcoin\Networking\Structure\NetworkAddressInterface;
7
use Evenement\EventEmitter;
8
use React\Promise\Deferred;
9
use React\Promise\RejectedPromise;
10
use React\Promise\Timer\TimeoutException;
11
12
class Manager extends EventEmitter
13
{
14
    /**
15
     * @var Connector
16
     */
17
    private $connector;
18
19
    /**
20
     * @var Peer[]
21
     */
22
    private $outPeers = [];
23
24
    /**
25
     * @var Peer[]
26
     */
27
    private $inPeers = [];
28
29
    /**
30
     * @var int
31
     */
32
    private $nOutPeers = 0;
33
34
    /**
35
     * @var int
36
     */
37
    private $nInPeers = 0;
38
39
    /**
40 6
     * @var NetworkSettings
41
     */
42 6
    private $settings;
43 6
44
    /**
45
     * Manager constructor.
46
     * @param Connector $connector
47
     * @param NetworkSettings $settings
48
     */
49
    public function __construct(Connector $connector, NetworkSettings $settings)
50
    {
51 3
        $this->connector = $connector;
52
        $this->settings = $settings;
53 3
    }
54
55 3
    /**
56 3
     * Store the newly connected peer, and trigger a new connection if they go away.
57 3
     *
58
     * @param Peer $peer
59 3
     * @return Peer
60 3
     */
61 3
    public function registerOutboundPeer(Peer $peer)
62
    {
63
        $next = $this->nOutPeers++;
64
        $peer->on('close', function ($peer) use ($next) {
65
            $this->emit('disconnect', [$peer]);
66
            unset($this->outPeers[$next]);
67 3
        });
68
69 3
        $this->outPeers[$next] = $peer;
70 3
        $this->emit('outbound', [$peer]);
71
        return $peer;
72
    }
73 3
74 3
    /**
75 3
     * @param Peer $peer
76
     */
77
    public function registerInboundPeer(Peer $peer)
78
    {
79
        $next = $this->nInPeers++;
80
        $this->inPeers[$next] = $peer;
81 3
        $peer->on('close', function () use ($next) {
82
            unset($this->inPeers[$next]);
83
        });
84 3
        $this->emit('inbound', [$peer]);
85 3
    }
86
87 3
    /**
88
     * @param Listener $listener
89
     * @return $this
90
     */
91
    public function registerListener(Listener $listener)
92
    {
93
        $listener->on('connection', function (Peer $peer) {
94
            $this->registerInboundPeer($peer);
95 3
        });
96
97 3
        return $this;
98
    }
99
100
    /**
101
     * @param NetworkAddressInterface $address
102
     * @return \React\Promise\PromiseInterface
103
     * @throws \Exception
104 3
     */
105
    public function connect(NetworkAddressInterface $address)
106 3
    {
107
        return $this->connector->connect($address);
108
    }
109
110 3
    /**
111 2
     * @param Locator $locator
112
     * @return \React\Promise\Promise|\React\Promise\PromiseInterface
113
     */
114
    public function getAnotherPeer(Locator $locator)
115
    {
116
        $deferred = new Deferred();
117
118
        // Otherwise, rely on the Locator.
119
        try {
120 3
            $deferred->resolve($locator->popAddress());
121 3
        } catch (\Exception $e) {
122
            $locator->queryDnsSeeds()->then(
123 3
                function () use ($deferred, $locator) {
124
                    $deferred->resolve($locator->popAddress());
125 3
                },
126 3
                function ($error) use ($deferred) {
127 1
                    return new RejectedPromise($error);
128 2
                }
129 1
            );
130 2
        }
131 3
132 1
        return $deferred->promise();
133 3
    }
134
135
    /**
136
     * @param Locator $locator
137
     * @return \React\Promise\Promise|\React\Promise\PromiseInterface
138
     */
139
    public function attemptNextPeer(Locator $locator)
140
    {
141 3
        $attempt = new Deferred();
142
143 3
        $this
144 3
            ->getAnotherPeer($locator)
145 3
            ->then(function (NetworkAddressInterface $address) use ($attempt, $locator) {
146 2
                return $this->connect($address)->then(
147
                    function (Peer $peer) use ($attempt) {
148 3
                        $this->registerOutboundPeer($peer);
149
                        $attempt->resolve($peer);
150
                        return $peer;
151
                    },
152
                    function (\Exception $error) use ($attempt) {
153
                        $attempt->reject($error);
154
                    }
155
                );
156
            }, function ($error) use ($attempt) {
157
                $attempt->reject($error);
158
            });
159
160
        return $attempt->promise();
161
    }
162
163
    /**
164
     * @param Locator $locator
165
     * @param int $retries
166
     * @return \React\Promise\PromiseInterface
167
     */
168
    public function connectNextPeer(Locator $locator, $retries = null)
169
    {
170
        if ($retries === null) {
171
            $retries = $this->settings->getMaxConnectRetries();
172
        }
173
174
        if (!(is_integer($retries) && $retries >= 0)) {
175
            throw new \InvalidArgumentException("Invalid retry count, must be an integer greater than zero");
176
        }
177
178
        $errorBack = function ($error) use ($locator, $retries) {
179
            $allowContinue = false;
180
            if ($error instanceof \RuntimeException) {
181
                if ($error->getMessage() === "Connection refused") {
182
                    $allowContinue = true;
183
                }
184
            }
185
186
            if ($error instanceof TimeoutException) {
187
                $allowContinue = true;
188
            }
189
190
            if (!$allowContinue) {
191
                throw $error;
192
            }
193
194
            if (0 >= $retries) {
195
                throw new \RuntimeException("Connection to peers failed: too many attempts");
196
            }
197
198
            return $this->connectNextPeer($locator, $retries - 1);
199
        };
200
201
        return $this
202
            ->attemptNextPeer($locator)
203
            ->then(null, $errorBack);
204
    }
205
206
    /**
207
     * @param Locator $locator
208
     * @param int $n
209
     * @return \React\Promise\Promise
210
     */
211
    public function connectToPeers(Locator $locator, $n)
212
    {
213
        $peers = [];
214
        for ($i = 0; $i < $n; $i++) {
215
            $peers[$i] = $this->connectNextPeer($locator);
216
        }
217
218
        return \React\Promise\all($peers);
219
    }
220
}
221