Passed
Push — main ( ce0359...5ac02b )
by Jochen
01:56
created

syslog2irc.main.Processor.run()   A

Complexity

Conditions 3

Size

Total Lines 14
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 9.762

Importance

Changes 0
Metric Value
cc 3
eloc 11
nop 1
dl 0
loc 14
ccs 1
cts 11
cp 0.0909
crap 9.762
rs 9.85
c 0
b 0
f 0
1
"""
2
syslog2irc.main
3
~~~~~~~~~~~~~~~
4
5
Orchestration, application entry point
6
7
:Copyright: 2007-2021 Jochen Kupperschmidt
8
:License: MIT, see LICENSE for details.
9
"""
10
11 1
import logging
12 1
from queue import SimpleQueue
13 1
from typing import Callable, Optional, Tuple
14
15 1
from syslogmp import Message as SyslogMessage
16
17 1
from .cli import parse_args
18 1
from .config import Config, load_config
19 1
from .formatting import format_message
20 1
from .irc import create_bot
21 1
from .network import Port
22 1
from .routing import Router
23 1
from .signals import irc_channel_joined, syslog_message_received
24 1
from .syslog import start_syslog_message_receivers
25 1
from .util import configure_logging
26
27
28 1
logger = logging.getLogger(__name__)
29
30
31 1
FormatMessageCallable = Callable[[Tuple[str, int], SyslogMessage], str]
32
33
34
# A note on threads (implementation detail):
35
#
36
# This application uses threads. Besides the main thread there is one
37
# thread for *each* syslog message receiver (which itself is a threading
38
# server!) and one thread for the (actual) IRC bot. (The dummy bot does
39
# not run in a separate thread.)
40
41
# Those threads are configured to be daemon threads. A Python
42
# application exits if no more non-daemon threads are running.
43
#
44
# For details, consult the documentation on the `threading` module that
45
# is part of Python's standard library.
46
47
48 1
class Processor:
49 1
    def __init__(
50
        self,
51
        config: Config,
52
        *,
53
        custom_format_message: Optional[FormatMessageCallable] = None,
54
    ) -> None:
55 1
        self.irc_bot = create_bot(config.irc)
56 1
        self.syslog_ports = {route.syslog_port for route in config.routes}
57 1
        self.router = Router(config.routes)
58 1
        self.message_queue: SimpleQueue = SimpleQueue()
59
60 1
        if custom_format_message is not None:
61
            self.format_message = custom_format_message
62
        else:
63 1
            self.format_message = format_message
64
65
        # Up to this point, no signals must have been sent.
66 1
        self.connect_to_signals()
67
        # Signals are allowed be sent from here on.
68
69 1
    def connect_to_signals(self) -> None:
70 1
        irc_channel_joined.connect(self.router.enable_channel)
71 1
        syslog_message_received.connect(self.handle_syslog_message)
72
73 1
    def handle_syslog_message(
74
        self,
75
        port: Port,
76
        *,
77
        source_address: Optional[Tuple[str, int]] = None,
78
        message: Optional[SyslogMessage] = None,
79
    ) -> None:
80
        """Process an incoming syslog message."""
81
        self.message_queue.put((port, source_address, message))
82
83 1
    def announce_message(
84
        self,
85
        port: Port,
86
        source_address: Tuple[str, int],
87
        message: SyslogMessage,
88
    ) -> None:
89
        """Announce message on IRC."""
90
        channel_names = self.router.get_channel_names_for_port(port)
91
        text = self.format_message(source_address, message)
92
93
        for channel_name in channel_names:
94
            if self.router.is_channel_enabled(channel_name):
95
                self.irc_bot.say(channel_name, text)
96
97 1
    def run(self) -> None:
98
        """Start network-based components, run main loop."""
99
        self.irc_bot.start()
100
        start_syslog_message_receivers(self.syslog_ports)
101
102
        try:
103
            while True:
104
                port, source_address, message = self.message_queue.get()
105
                self.announce_message(port, source_address, message)
106
        except KeyboardInterrupt:
107
            pass
108
109
        logger.info('Shutting down ...')
110
        self.irc_bot.disconnect('Bye.')  # Joins bot thread.
111
112
113 1
def main(
114
    *, custom_format_message: Optional[FormatMessageCallable] = None
115
) -> None:
116
    """Parse arguments, load configuration, and start the application."""
117
    args = parse_args()
118
    config = load_config(args.config_filename)
119
    configure_logging(config.log_level)
120
121
    processor = Processor(config, custom_format_message=custom_format_message)
122
    processor.run()
123
124
125 1
if __name__ == '__main__':
126
    main()
127