Test Failed
Push — main ( a03b0b...4dacf5 )
by Jochen
04:44
created

syslog2irc.syslog._handle_received_message()   A

Complexity

Conditions 1

Size

Total Lines 13
Code Lines 10

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 1.512

Importance

Changes 0
Metric Value
cc 1
eloc 10
nop 3
dl 0
loc 13
rs 9.9
c 0
b 0
f 0
ccs 1
cts 5
cp 0.2
crap 1.512
1
"""
2
syslog2irc.syslog
3
~~~~~~~~~~~~~~~~~
4
5
BSD syslog message reception and handling
6
7
:Copyright: 2007-2021 Jochen Kupperschmidt
8
:License: MIT, see LICENSE for details.
9
"""
10
11 1
from functools import partial
12 1
import logging
13 1
from socketserver import (
14 1
    BaseRequestHandler,
15 1
    StreamRequestHandler,
16
    ThreadingTCPServer,
17 1
    ThreadingUDPServer,
18 1
)
19
import sys
20 1
from typing import Iterable, Tuple, Union
21 1
22
import syslogmp
23
from syslogmp import Message as SyslogMessage
24 1
25
from .network import format_port, Port, TransportProtocol
26
from .signals import syslog_message_received
27 1
from .util import start_thread
28
29
30 1
logger = logging.getLogger(__name__)
31 1
32 1
33
class TCPHandler(StreamRequestHandler):
34 1
    """Handler for syslog messages arriving via TCP."""
35 1
36 1
    def __init__(self, port: Port, *args, **kwargs) -> None:
37 1
        self.port = port
38
        super().__init__(*args, **kwargs)
39
40
    def handle(self) -> None:
41
        for line in self.rfile:
42
            try:
43
                message = syslogmp.parse(line)
44 1
            except ValueError:
45
                logger.info(
46
                    'Invalid message received from %s:%d.', *self.client_address
47
                )
48
                return None
49
50
            _handle_received_message(self.client_address, self.port, message)
51
52 1
53
class UDPHandler(BaseRequestHandler):
54
    """Handler for syslog messages arriving via UDP."""
55
56
    def __init__(self, port: Port, *args, **kwargs) -> None:
57 1
        self.port = port
58
        super().__init__(*args, **kwargs)
59
60
    def handle(self) -> None:
61
        try:
62
            data = self.request[0]
63
            message = syslogmp.parse(data)
64 1
        except ValueError:
65
            logger.info(
66
                'Invalid message received from %s:%d.', *self.client_address
67
            )
68
            return None
69
70
        _handle_received_message(self.client_address, self.port, message)
71
72
73
def _handle_received_message(
74
    client_address: Tuple[str, int], port: Port, message: SyslogMessage
75
) -> None:
76
    logger.debug(
77
        'Received message from %s:%d on port %s -> %s',
78
        client_address[0],
79
        client_address[1],
80
        format_port(port),
81
        format_message_for_log(message),
82
    )
83
84 1
    syslog_message_received.send(
85
        port, source_address=client_address, message=message
86
    )
87
88
89
def create_server(port: Port) -> Union[ThreadingTCPServer, ThreadingUDPServer]:
90 1
    """Create a threading server to receive syslog messages."""
91
    address = ('', port.number)
92 1
93
    if port.transport_protocol == TransportProtocol.TCP:
94
        tcp_handler_class = partial(TCPHandler, port)
95
        return ThreadingTCPServer(address, tcp_handler_class)
96
    elif port.transport_protocol == TransportProtocol.UDP:
97
        udp_handler_class = partial(UDPHandler, port)
98
        return ThreadingUDPServer(address, udp_handler_class)
99
    else:
100
        raise ValueError(f'Unsupported transport protocol')
101
102
103
def start_server(port: Port) -> None:
104
    """Start a server, in a separate thread."""
105
    try:
106
        server = create_server(port)
107
    except OSError as e:
108
        sys.stderr.write(f'Error {e.errno:d}: {e.strerror}\n')
109
        sys.stderr.write(
110
            f'Cannot open port {format_port(port)}. Could be already in use. '
111
            f'Or permission is lacking; try a port number above 1,024 (or '
112
            'even 4,096) and up to 65,535.\n'
113
        )
114
        sys.exit(1)
115
116
    thread_name = f'{server.__class__.__name__}-port{port}'
117
    start_thread(server.serve_forever, thread_name)
118
    logger.info(
119
        'Listening for syslog messages on %s:%s.',
120
        server.server_address[0],
121
        format_port(port),
122
    )
123
124
125
def start_syslog_message_receivers(ports: Iterable[Port]) -> None:
126
    """Start one syslog message receiving server for each port."""
127
    for port in ports:
128
        start_server(port)
129
130
131
def format_message_for_log(message: SyslogMessage) -> str:
132
    """Format a syslog message to be logged."""
133
    return (
134
        f'facility={message.facility.name}, '
135
        f'severity={message.severity.name}, '
136
        f'timestamp={message.timestamp.isoformat()}, '
137
        f'hostname={message.hostname}, '
138
        f'message={message.message}'
139
    )
140