Issues (2)

src/Message/BitcoindMessageWorker.php (1 issue)

Labels
Severity
1
<?php declare(strict_types=1);
2
/**
3
 * This file is part of the ngutech/bitcoind-adapter project.
4
 *
5
 * For the full copyright and license information, please view the LICENSE
6
 * file that was distributed with this source code.
7
 */
8
9
namespace NGUtech\Bitcoind\Message;
10
11
use BitWasp\Buffertools\Buffer;
12
use Daikon\AsyncJob\Worker\WorkerInterface;
13
use Daikon\Boot\Service\Provisioner\MessageBusProvisioner;
14
use Daikon\Interop\Assertion;
15
use Daikon\Interop\RuntimeException;
16
use Daikon\MessageBus\MessageBusInterface;
17
use Daikon\RabbitMq3\Connector\RabbitMq3Connector;
18
use Daikon\ValueObject\Timestamp;
19
use NGUtech\Bitcoin\Message\BitcoinBlockHashReceived;
20
use NGUtech\Bitcoin\Message\BitcoinMessageInterface;
21
use NGUtech\Bitcoin\Message\BitcoinTransactionHashReceived;
22
use PhpAmqpLib\Channel\AMQPChannel;
23
use PhpAmqpLib\Message\AMQPMessage;
24
use Psr\Log\LoggerInterface;
25
26
final class BitcoindMessageWorker implements WorkerInterface
27
{
28
    private const MESSAGE_BLOCK_HASH = 'bitcoind.message.hashblock';
29
    private const MESSAGE_TRANSACTION_HASH = 'bitcoind.message.hashtx';
30
31
    private RabbitMq3Connector $connector;
32
33
    private MessageBusInterface $messageBus;
34
35
    private LoggerInterface $logger;
36
37
    private array $settings;
38
39
    public function __construct(
40
        RabbitMq3Connector $connector,
41
        MessageBusInterface $messageBus,
42
        LoggerInterface $logger,
43
        array $settings = []
44
    ) {
45
        $this->connector = $connector;
46
        $this->messageBus = $messageBus;
47
        $this->logger = $logger;
48
        $this->settings = $settings;
49
    }
50
51
    public function run(array $parameters = []): void
52
    {
53
        $queue = $parameters['queue'];
54
        Assertion::notBlank($queue);
55
56
        $messageHandler = function (AMQPMessage $amqpMessage): void {
57
            $this->execute($amqpMessage);
58
        };
59
60
        /** @var AMQPChannel $channel */
61
        $channel = $this->connector->getConnection()->channel();
62
        $channel->basic_qos(0, 1, false);
63
        $channel->basic_consume($queue, '', true, false, false, false, $messageHandler);
64
65
        while (count($channel->callbacks)) {
66
            $channel->wait();
67
        }
68
    }
69
70
    private function execute(AMQPMessage $amqpMessage): void
71
    {
72
        try {
73
            $message = $this->createMessage($amqpMessage);
74
            if ($message instanceof BitcoinMessageInterface) {
75
                $this->messageBus->publish($message, MessageBusProvisioner::EVENTS_CHANNEL);
76
            }
77
            $amqpMessage->ack();
78
        } catch (RuntimeException $error) {
79
            $this->logger->error(
80
                "Error handling bitcoind message '{$amqpMessage->getRoutingKey()}'.",
81
                ['exception' => $error->getTrace()]
82
            );
83
            $amqpMessage->nack();
84
        }
85
    }
86
87
    private function createMessage(AMQPMessage $amqpMessage): ?BitcoinMessageInterface
88
    {
89
        $payload = [
90
            'hash' => (new Buffer($amqpMessage->body))->getHex(),
91
            'receivedAt' => (string)Timestamp::fromTime($amqpMessage->get('timestamp'))
0 ignored issues
show
It seems like $amqpMessage->get('timestamp') can also be of type PhpAmqpLib\Channel\AMQPChannel; however, parameter $time of Daikon\ValueObject\Timestamp::fromTime() does only seem to accept integer|string, maybe add an additional type check? ( Ignorable by Annotation )

If this is a false-positive, you can also ignore this issue in your code via the ignore-type  annotation

91
            'receivedAt' => (string)Timestamp::fromTime(/** @scrutinizer ignore-type */ $amqpMessage->get('timestamp'))
Loading history...
92
        ];
93
94
        switch ($amqpMessage->getRoutingKey()) {
95
            case self::MESSAGE_TRANSACTION_HASH:
96
                $message = BitcoinTransactionHashReceived::fromNative($payload);
97
                break;
98
            case self::MESSAGE_BLOCK_HASH:
99
                $message = BitcoinBlockHashReceived::fromNative($payload);
100
                break;
101
            default:
102
                // ignore unknown routing keys
103
        }
104
105
        return $message ?? null;
106
    }
107
}
108