LndMessageWorker::execute()   A
last analyzed

Complexity

Conditions 3
Paths 6

Size

Total Lines 14
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 12

Importance

Changes 2
Bugs 0 Features 0
Metric Value
cc 3
eloc 10
c 2
b 0
f 0
nc 6
nop 1
dl 0
loc 14
ccs 0
cts 9
cp 0
crap 12
rs 9.9332
1
<?php declare(strict_types=1);
2
/**
3
 * This file is part of the ngutech/lnd-adapter project.
4
 *
5
 * For the full copyright and license information, please view the LICENSE
6
 * file that was distributed with this source code.
7
 */
8
9
namespace NGUtech\Lnd\Message;
10
11
use Daikon\AsyncJob\Worker\WorkerInterface;
12
use Daikon\Boot\Service\Provisioner\MessageBusProvisioner;
13
use Daikon\Interop\Assertion;
14
use Daikon\Interop\RuntimeException;
15
use Daikon\MessageBus\MessageBusInterface;
16
use Daikon\RabbitMq3\Connector\RabbitMq3Connector;
17
use Daikon\ValueObject\Timestamp;
18
use NGUtech\Bitcoin\Service\SatoshiCurrencies;
19
use NGUtech\Lightning\Message\LightningInvoiceMessageInterface;
20
use NGUtech\Lightning\Message\LightningMessageInterface;
21
use PhpAmqpLib\Channel\AMQPChannel;
22
use PhpAmqpLib\Message\AMQPMessage;
23
use Psr\Log\LoggerInterface;
24
25
final class LndMessageWorker implements WorkerInterface
26
{
27
    private const MESSAGE_INVOICE = 'lnd.message.invoice';
28
    private const MESSAGE_HTLC_EVENT = 'lnd.message.htlc_event';
29
    private const STATE_INVOICE_OPEN = 0;
30
    private const STATE_INVOICE_SETTLED = 1;
31
    private const STATE_INVOICE_CANCELLED = 2;
32
    private const STATE_INVOICE_ACCEPTED = 3;
33
    private const EVENT_HTLC_FORWARD = 7;
34
    private const EVENT_HTLC_FORWARD_FAIL = 8;
35
    private const EVENT_HTLC_SETTLE = 9;
36
    private const EVENT_HTLC_LINK_FAIL = 10;
37
38
    private RabbitMq3Connector $connector;
39
40
    private MessageBusInterface $messageBus;
41
42
    private LoggerInterface $logger;
43
44
    private array $settings;
45
46
    public function __construct(
47
        RabbitMq3Connector $connector,
48
        MessageBusInterface $messageBus,
49
        LoggerInterface $logger,
50
        array $settings = []
51
    ) {
52
        $this->connector = $connector;
53
        $this->messageBus = $messageBus;
54
        $this->logger = $logger;
55
        $this->settings = $settings;
56
    }
57
58
    public function run(array $parameters = []): void
59
    {
60
        $queue = $parameters['queue'];
61
        Assertion::notBlank($queue);
62
63
        $messageHandler = function (AMQPMessage $amqpMessage): void {
64
            $this->execute($amqpMessage);
65
        };
66
67
        /** @var AMQPChannel $channel */
68
        $channel = $this->connector->getConnection()->channel();
69
        $channel->basic_qos(0, 1, false);
70
        $channel->basic_consume($queue, '', true, false, false, false, $messageHandler);
71
72
        while (count($channel->callbacks)) {
73
            $channel->wait();
74
        }
75
    }
76
77
    private function execute(AMQPMessage $amqpMessage): void
78
    {
79
        try {
80
            $message = $this->createMessage($amqpMessage);
81
            if ($message instanceof LightningMessageInterface) {
82
                $this->messageBus->publish($message, MessageBusProvisioner::EVENTS_CHANNEL);
83
            }
84
            $amqpMessage->ack();
85
        } catch (RuntimeException $error) {
86
            $this->logger->error(
87
                "Error handling lnd message '{$amqpMessage->getRoutingKey()}'.",
88
                ['exception' => $error->getTrace()]
89
            );
90
            $amqpMessage->nack();
91
        }
92
    }
93
94
    private function createMessage(AMQPMessage $amqpMessage): ?LightningMessageInterface
95
    {
96
        switch ($amqpMessage->getRoutingKey()) {
97
            case self::MESSAGE_INVOICE:
98
                $message = $this->createInvoiceMessage($amqpMessage);
99
                break;
100
            case self::MESSAGE_HTLC_EVENT:
101
                $message = $this->createHtlcMessage($amqpMessage);
102
                break;
103
            default:
104
                // ignore unknown routing keys
105
        }
106
107
        return $message ?? null;
108
    }
109
110
    private function createInvoiceMessage(AMQPMessage $amqpMessage): LightningInvoiceMessageInterface
111
    {
112
        $invoice = json_decode($amqpMessage->body, true);
113
114
        switch ($invoice['state']) {
115
            case self::STATE_INVOICE_OPEN:
116
                $messageFqcn = LndInvoiceRequested::class;
117
                $timestamp = Timestamp::fromTime($invoice['creationDate']);
118
                break;
119
            case self::STATE_INVOICE_SETTLED:
120
                $messageFqcn = LndInvoiceSettled::class;
121
                $timestamp = Timestamp::fromTime($invoice['settleDate']);
122
                break;
123
            case self::STATE_INVOICE_CANCELLED:
124
                $messageFqcn = LndInvoiceCancelled::class;
125
                $timestamp = $amqpMessage->get('timestamp');
126
                break;
127
            case self::STATE_INVOICE_ACCEPTED:
128
                $messageFqcn = LndInvoiceAccepted::class;
129
                $timestamp = $amqpMessage->get('timestamp');
130
                break;
131
            default:
132
                throw new RuntimeException("Unhandled LND invoice state '".$invoice['state']."'.");
133
        }
134
135
        return $messageFqcn::fromNative([
136
            'preimageHash' => $invoice['rHash'],
137
            'preimage' => $invoice['rPreimage'] ?? null,
138
            'request' => $invoice['paymentRequest'],
139
            'amount' => $invoice['valueMsat'].SatoshiCurrencies::MSAT,
140
            'amountPaid' => $invoice['amtPaidMsat'].SatoshiCurrencies::MSAT,
141
            'timestamp' => (string)$timestamp,
142
            'cltvExpiry' => $invoice['cltvExpiry']
143
        ]);
144
    }
145
146
    private function createHtlcMessage(AMQPMessage $amqpMessage): LightningMessageInterface
147
    {
148
        $event = json_decode($amqpMessage->body, true);
149
150
        switch ($event['event']) {
151
            case self::EVENT_HTLC_FORWARD:
152
                $messageFqcn = LndHtlcForwarded::class;
153
                break;
154
            case self::EVENT_HTLC_FORWARD_FAIL:
155
                $messageFqcn = LndHtlcForwardFailed::class;
156
                break;
157
            case self::EVENT_HTLC_SETTLE:
158
                $messageFqcn = LndHtlcSettled::class;
159
                break;
160
            case self::EVENT_HTLC_LINK_FAIL:
161
                $messageFqcn = LndHtlcLinkFailed::class;
162
                break;
163
            default:
164
                throw new RuntimeException("Unhandled LND event '".$event['event']."'.");
165
        }
166
167
        return $messageFqcn::fromNative([
168
            'incomingChannelId' => $event['incomingChannelId'],
169
            'outgoingChannelId' => $event['outgoingChannelId'],
170
            'incomingHtlcId' => $event['incomingHtlcId'],
171
            'outgoingHtlcId' => $event['outgoingHtlcId'],
172
            'timestamp' => substr_replace(substr($event['timestampNs'], 0, -3), '.', -6, 0),
173
            'eventType' => $event['eventType']
174
        ]);
175
    }
176
}
177