Passed
Push — main ( 287fd9...ba5fce )
by Jochen
30:44
created

weitersager.processor.Processor.announce_message()   A

Complexity

Conditions 2

Size

Total Lines 10
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 3
CRAP Score 2.2559

Importance

Changes 0
Metric Value
cc 2
eloc 7
nop 3
dl 0
loc 10
ccs 3
cts 5
cp 0.6
crap 2.2559
rs 10
c 0
b 0
f 0
1
"""
2
weitersager.processor
3
~~~~~~~~~~~~~~~~~~~~~
4
5
Connect HTTP server and IRC bot.
6
7
:Copyright: 2007-2021 Jochen Kupperschmidt
8
:License: MIT, see LICENSE for details.
9
"""
10
11 1
import logging
12 1
from queue import SimpleQueue
13 1
from typing import Any, Optional, Set, Tuple
14
15 1
from .config import Config
16 1
from .http import start_receive_server
17 1
from .irc import create_bot
18 1
from .signals import irc_channel_joined, message_received
19
20
21 1
logger = logging.getLogger(__name__)
22
23
24 1
class Processor:
25 1
    def __init__(self, config: Config) -> None:
26 1
        self.config = config
27 1
        self.irc_bot = create_bot(config.irc)
28 1
        self.enabled_channel_names: Set[str] = set()
29 1
        self.message_queue: SimpleQueue = SimpleQueue()
30
31
        # Up to this point, no signals must have been sent.
32 1
        self.connect_to_signals()
33
        # Signals are allowed be sent from here on.
34
35 1
    def connect_to_signals(self) -> None:
36 1
        irc_channel_joined.connect(self.enable_channel)
37 1
        message_received.connect(self.handle_message)
38
39 1
    def enable_channel(self, sender, *, channel_name=None) -> None:
40 1
        logger.info('Enabled forwarding to channel %s.', channel_name)
41 1
        self.enabled_channel_names.add(channel_name)
42
43 1
    def handle_message(
44
        self,
45
        sender: Optional[Any],
46
        *,
47
        channel_name: str,
48
        text: str,
49
        source_address: Tuple[str, int],
50
    ) -> None:
51
        """Log and announce an incoming message."""
52 1
        logger.debug(
53
            'Received message from %s:%d for channel %s with text "%s".',
54
            source_address[0],
55
            source_address[1],
56
            channel_name,
57
            text,
58
        )
59
60 1
        self.message_queue.put((channel_name, text))
61
62 1
    def announce_message(self, channel_name: str, text: str) -> None:
63
        """Announce message on IRC."""
64 1
        if channel_name not in self.enabled_channel_names:
65
            logger.warning(
66
                'Could not send message to channel %s, not joined.',
67
                channel_name,
68
            )
69
            return
70
71 1
        self.irc_bot.say(channel_name, text)
72
73 1
    def process_queue(self, timeout_seconds: Optional[int] = None) -> None:
74
        """Process a message from the queue."""
75 1
        channel_name, text = self.message_queue.get(timeout=timeout_seconds)
76 1
        self.announce_message(channel_name, text)
77
78 1
    def run(self) -> None:
79
        """Run the main loop."""
80
        self.irc_bot.start()
81
        start_receive_server(self.config.http)
82
83
        try:
84
            while True:
85
                self.process_queue()
86
        except KeyboardInterrupt:
87
            pass
88
89
        logger.info('Shutting down ...')
90
        self.irc_bot.disconnect('Bye.')  # Joins bot thread.
91
92
93 1
def start(config: Config) -> None:
94
    """Start the IRC bot and the HTTP listen server."""
95
    processor = Processor(config)
96
    processor.run()
97