LightningdMessageWorker   A
last analyzed

Complexity

Total Complexity 11

Size/Duplication

Total Lines 100
Duplicated Lines 0 %

Test Coverage

Coverage 0%

Importance

Changes 1
Bugs 0 Features 0
Metric Value
eloc 53
c 1
b 0
f 0
dl 0
loc 100
ccs 0
cts 40
cp 0
rs 10
wmc 11

6 Methods

Rating   Name   Duplication   Size   Complexity  
A run() 0 16 2
A __construct() 0 10 1
A createSendpaySuccessMessage() 0 10 1
A createMessage() 0 14 3
A execute() 0 14 3
A createInvoicePaymentMessage() 0 10 1
1
<?php declare(strict_types=1);
2
/**
3
 * This file is part of the ngutech/lightningd-adapter project.
4
 *
5
 * For the full copyright and license information, please view the LICENSE
6
 * file that was distributed with this source code.
7
 */
8
9
namespace NGUtech\Lightningd\Message;
10
11
use Daikon\AsyncJob\Worker\WorkerInterface;
12
use Daikon\Boot\Service\Provisioner\MessageBusProvisioner;
13
use Daikon\Interop\Assertion;
14
use Daikon\Interop\RuntimeException;
15
use Daikon\MessageBus\MessageBusInterface;
16
use Daikon\RabbitMq3\Connector\RabbitMq3Connector;
17
use NGUtech\Bitcoin\Service\SatoshiCurrencies;
18
use NGUtech\Lightning\Message\LightningMessageInterface;
19
use PhpAmqpLib\Channel\AMQPChannel;
20
use PhpAmqpLib\Message\AMQPMessage;
21
use Psr\Log\LoggerInterface;
22
23
final class LightningdMessageWorker implements WorkerInterface
24
{
25
    private const MESSAGE_INVOICE_PAYMENT = 'lightningd.message.invoice_payment';
26
    private const MESSAGE_SENDPAY_SUCCESS = 'lightningd.message.sendpay_success';
27
28
    private RabbitMq3Connector $connector;
29
30
    private MessageBusInterface $messageBus;
31
32
    private LoggerInterface $logger;
33
34
    private array $settings;
35
36
    public function __construct(
37
        RabbitMq3Connector $connector,
38
        MessageBusInterface $messageBus,
39
        LoggerInterface $logger,
40
        array $settings = []
41
    ) {
42
        $this->connector = $connector;
43
        $this->messageBus = $messageBus;
44
        $this->logger = $logger;
45
        $this->settings = $settings;
46
    }
47
48
    public function run(array $parameters = []): void
49
    {
50
        $queue = $parameters['queue'];
51
        Assertion::notBlank($queue);
52
53
        $messageHandler = function (AMQPMessage $amqpMessage): void {
54
            $this->execute($amqpMessage);
55
        };
56
57
        /** @var AMQPChannel $channel */
58
        $channel = $this->connector->getConnection()->channel();
59
        $channel->basic_qos(0, 1, false);
60
        $channel->basic_consume($queue, '', true, false, false, false, $messageHandler);
61
62
        while (count($channel->callbacks)) {
63
            $channel->wait();
64
        }
65
    }
66
67
    private function execute(AMQPMessage $amqpMessage): void
68
    {
69
        try {
70
            $message = $this->createMessage($amqpMessage);
71
            if ($message instanceof LightningMessageInterface) {
72
                $this->messageBus->publish($message, MessageBusProvisioner::EVENTS_CHANNEL);
73
            }
74
            $amqpMessage->ack();
75
        } catch (RuntimeException $error) {
76
            $this->logger->error(
77
                "Error handling lightningd message '{$amqpMessage->getRoutingKey()}'.",
78
                ['exception' => $error->getTrace()]
79
            );
80
            $amqpMessage->nack();
81
        }
82
    }
83
84
    private function createMessage(AMQPMessage $amqpMessage): ?LightningMessageInterface
85
    {
86
        switch ($amqpMessage->getRoutingKey()) {
87
            case self::MESSAGE_INVOICE_PAYMENT:
88
                $message = $this->createInvoicePaymentMessage($amqpMessage);
89
                break;
90
            case self::MESSAGE_SENDPAY_SUCCESS:
91
                $message = $this->createSendpaySuccessMessage($amqpMessage);
92
                break;
93
            default:
94
                // ignore unknown routing keys
95
        }
96
97
        return $message ?? null;
98
    }
99
100
    private function createInvoicePaymentMessage(AMQPMessage $amqpMessage): LightningdInvoiceSettled
101
    {
102
        $invoice = json_decode($amqpMessage->body, true)['invoice_payment'];
103
104
        return LightningdInvoiceSettled::fromNative([
105
            'preimageHash' => hash('sha256', hex2bin($invoice['preimage'])),
106
            'preimage' => $invoice['preimage'] ?? null,
107
            'amountPaid' => strtoupper($invoice['msat']),
108
            'label' => $invoice['label'],
109
            'timestamp' => (string)$amqpMessage->get('timestamp')
110
        ]);
111
    }
112
113
    private function createSendpaySuccessMessage(AMQPMessage $amqpMessage): LightningdPaymentSucceeded
114
    {
115
        $payment = json_decode($amqpMessage->body, true)['sendpay_success'];
116
117
        return LightningdPaymentSucceeded::fromNative([
118
            'preimage' => $payment['payment_preimage'],
119
            'preimageHash' => $payment['payment_hash'],
120
            'amount' => $payment['msatoshi'].SatoshiCurrencies::MSAT,
121
            'amountPaid' => $payment['msatoshi_sent'].SatoshiCurrencies::MSAT,
122
            'timestamp' => (string)$amqpMessage->get('timestamp')
123
        ]);
124
    }
125
}
126