weitersager.processor.Processor.enable_channel()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 3
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 1

Importance

Changes 0
Metric Value
eloc 3
dl 0
loc 3
ccs 3
cts 3
cp 1
rs 10
c 0
b 0
f 0
cc 1
nop 4
crap 1
1
"""
2
weitersager.processor
3
~~~~~~~~~~~~~~~~~~~~~
4
5
Connect HTTP server and IRC bot.
6
7
:Copyright: 2007-2025 Jochen Kupperschmidt
8
:License: MIT, see LICENSE for details.
9
"""
10
11 1
from __future__ import annotations
12 1
import logging
13 1
from queue import SimpleQueue
14 1
from typing import Any
15
16 1
from .config import Config
17 1
from .http import start_receive_server
18 1
from .irc import create_announcer
19 1
from .signals import irc_channel_joined, message_received
20
21
22 1
logger = logging.getLogger(__name__)
23
24
25 1
class Processor:
26 1
    def __init__(self, config: Config) -> None:
27 1
        self.config = config
28 1
        self.announcer = create_announcer(config.irc)
29 1
        self.enabled_channel_names: set[str] = set()
30 1
        self.message_queue: SimpleQueue = SimpleQueue()
31
32
        # Up to this point, no signals must have been sent.
33 1
        self.connect_to_signals()
34
        # Signals are allowed be sent from here on.
35
36 1
    def connect_to_signals(self) -> None:
37 1
        irc_channel_joined.connect(self.enable_channel)
38 1
        message_received.connect(self.handle_message)
39
40 1
    def enable_channel(self, sender, *, channel_name=None) -> None:
41 1
        logger.info('Enabled forwarding to channel %s.', channel_name)
42 1
        self.enabled_channel_names.add(channel_name)
43
44 1
    def handle_message(
45
        self,
46
        sender: Any | None,
47
        *,
48
        channel_name: str,
49
        text: str,
50
        source_ip_address: str | None = None,
51
    ) -> None:
52
        """Log and announce an incoming message."""
53 1
        logger.debug(
54
            'Received message from %s for channel %s with text "%s".',
55
            source_ip_address or 'unknown address',
56
            channel_name,
57
            text,
58
        )
59
60 1
        self.message_queue.put((channel_name, text))
61
62 1
    def announce_message(self, channel_name: str, text: str) -> None:
63
        """Announce message on IRC."""
64 1
        if channel_name not in self.enabled_channel_names:
65
            logger.warning(
66
                'Could not send message to channel %s, not joined.',
67
                channel_name,
68
            )
69
            return
70
71 1
        self.announcer.announce(channel_name, text)
72
73 1
    def process_queue(self, timeout_seconds: int | None = None) -> None:
74
        """Process a message from the queue."""
75 1
        channel_name, text = self.message_queue.get(timeout=timeout_seconds)
76 1
        self.announce_message(channel_name, text)
77
78 1
    def run(self) -> None:
79
        """Run the main loop."""
80
        self.announcer.start()
81
        start_receive_server(self.config.http)
82
83
        logger.info('Starting to process queue ...')
84
        try:
85
            while True:
86
                self.process_queue()
87
        except KeyboardInterrupt:
88
            pass
89
90
        logger.info('Shutting down ...')
91
        self.announcer.shutdown()
92
93
94 1
def start(config: Config) -> None:
95
    """Start the IRC bot and the HTTP listen server."""
96
    processor = Processor(config)
97
    processor.run()
98