Passed
Push — main ( 4dacf5...aae69a )
by Jochen
02:11
created

syslog2irc.processor.Processor.__init__()   A

Complexity

Conditions 2

Size

Total Lines 14
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 5
CRAP Score 2.0185

Importance

Changes 0
Metric Value
eloc 11
dl 0
loc 14
ccs 5
cts 6
cp 0.8333
rs 9.85
c 0
b 0
f 0
cc 2
nop 3
crap 2.0185
1
"""
2
syslog2irc.processor
3
~~~~~~~~~~~~~~~~~~~~
4
5
:Copyright: 2007-2021 Jochen Kupperschmidt
6
:License: MIT, see LICENSE for details.
7
"""
8
9 1
import logging
10 1
from time import sleep
11 1
from typing import Any, Callable, Iterator, Optional, Set, Tuple
12
13 1
from syslogmp import Message as SyslogMessage
14
15 1
from .network import Port
16 1
from .router import Router
17 1
from .signals import (
18
    irc_channel_joined,
19
    message_approved,
20
    message_received,
21
    syslog_message_received,
22
)
23
24
25 1
MESSAGE_TEXT_ENCODING = 'utf-8'
26
27
28 1
logger = logging.getLogger(__name__)
29
30
31 1
class Processor:
32 1
    def __init__(
33
        self,
34
        router: Router,
35
        syslog_message_formatter: Optional[
36
            Callable[[SyslogMessage], str]
37
        ] = None,
38
    ) -> None:
39 1
        super(Processor, self).__init__()
40 1
        self.router = router
41
42 1
        if syslog_message_formatter is not None:
43
            self.syslog_message_formatter = syslog_message_formatter
44
        else:
45 1
            self.syslog_message_formatter = format_syslog_message
46
47 1
    def connect_to_signals(self) -> None:
48 1
        irc_channel_joined.connect(self.router.enable_channel)
49 1
        syslog_message_received.connect(self.handle_syslog_message)
50 1
        message_received.connect(self.handle_message)
51
52 1
    def handle_syslog_message(
53
        self,
54
        port: Port,
55
        source_address: Optional[Tuple[str, int]] = None,
56
        message: Optional[SyslogMessage] = None,
57
    ) -> None:
58
        """Process an incoming syslog message."""
59
        channel_names = self.router.get_channel_names_for_port(port)
60
61
        formatted_source = f'{source_address[0]}:{source_address[1]:d}'
62
        formatted_message = self.syslog_message_formatter(message)
63
        text = f'{formatted_source} {formatted_message}'
64
65
        message_received.send(
66
            channel_names=channel_names,
67
            text=text,
68
            source_address=source_address,
69
        )
70
71 1
    def handle_message(
72
        self,
73
        sender: Any,
74
        *,
75
        channel_names: Optional[Set[str]] = None,
76
        text: Optional[str] = None,
77
        source_address: Optional[Tuple[str, int]] = None,
78
    ) -> None:
79
        """Process an incoming message."""
80
        for channel_name in channel_names:
81
            if self.router.is_channel_enabled(channel_name):
82
                message_approved.send(channel_name=channel_name, text=text)
83
84 1
    def run(self, seconds_to_sleep: float = 0.5) -> None:
85
        """Run the main loop."""
86
        try:
87
            while True:
88
                sleep(seconds_to_sleep)
89
        except KeyboardInterrupt:
90
            pass
91
92
        logger.info('Shutting down ...')
93
94
95 1
def format_syslog_message(message: SyslogMessage) -> str:
96
    """Format a syslog message to be displayed on IRC."""
97
98 1
    def _generate() -> Iterator[str]:
99 1
        if message.timestamp is not None:
100 1
            timestamp_format = '%Y-%m-%d %H:%M:%S'
101 1
            formatted_timestamp = message.timestamp.strftime(timestamp_format)
102 1
            yield f'[{formatted_timestamp}] '
103
104 1
        if message.hostname is not None:
105 1
            yield f'({message.hostname}) '
106
107 1
        severity_name = message.severity.name
108
109
        # Important: The message text is a byte string.
110 1
        message_text = message.message.decode(MESSAGE_TEXT_ENCODING)
111
112
        # Remove leading and trailing newlines. Those would result in
113
        # additional lines on IRC with the usual metadata but with an
114
        # empty message text.
115 1
        message_text = message_text.strip('\n')
116
117 1
        yield f'[{severity_name}]: {message_text}'
118
119
    return ''.join(_generate())
120