syslog2irc.syslog.format_message_for_log()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 4
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 2
CRAP Score 1

Importance

Changes 0
Metric Value
cc 1
eloc 3
nop 1
dl 0
loc 4
rs 10
c 0
b 0
f 0
ccs 2
cts 2
cp 1
crap 1
1
"""
2
syslog2irc.syslog
3
~~~~~~~~~~~~~~~~~
4
5
BSD syslog message reception and handling
6
7
:Copyright: 2007-2021 Jochen Kupperschmidt
8
:License: MIT, see LICENSE for details.
9
"""
10
11 1
from __future__ import annotations
12 1
from functools import partial
13 1
import logging
14 1
from socketserver import (
15
    BaseRequestHandler,
16
    StreamRequestHandler,
17
    ThreadingTCPServer,
18
    ThreadingUDPServer,
19
)
20 1
import sys
21 1
from typing import Iterable, Union
22
23 1
import syslogmp
24 1
from syslogmp import Message as SyslogMessage
25
26 1
from .network import format_port, Port, TransportProtocol
27 1
from .signals import syslog_message_received
28 1
from .util import start_thread
29
30
31 1
logger = logging.getLogger(__name__)
32
33
34 1
class TCPHandler(StreamRequestHandler):
35
    """Handler for syslog messages arriving via TCP."""
36
37 1
    def __init__(self, port: Port, *args, **kwargs) -> None:
38
        self.port = port
39
        super().__init__(*args, **kwargs)
40
41 1
    def handle(self) -> None:
42
        for line in self.rfile:
43
            try:
44
                message = syslogmp.parse(line)
45
            except ValueError:
46
                logger.info(
47
                    'Invalid message received from %s:%d.', *self.client_address
48
                )
49
                return None
50
51
            _handle_received_message(self.client_address, self.port, message)
52
53
54 1
class UDPHandler(BaseRequestHandler):
55
    """Handler for syslog messages arriving via UDP."""
56
57 1
    def __init__(self, port: Port, *args, **kwargs) -> None:
58 1
        self.port = port
59 1
        super().__init__(*args, **kwargs)
60
61 1
    def handle(self) -> None:
62 1
        try:
63 1
            data = self.request[0]
64 1
            message = syslogmp.parse(data)
65
        except ValueError:
66
            logger.info(
67
                'Invalid message received from %s:%d.', *self.client_address
68
            )
69
            return None
70
71 1
        _handle_received_message(self.client_address, self.port, message)
72
73
74 1
def _handle_received_message(
75
    client_address: tuple[str, int], port: Port, message: SyslogMessage
76
) -> None:
77 1
    logger.debug(
78
        'Received message from %s:%d on port %s -> %s',
79
        client_address[0],
80
        client_address[1],
81
        format_port(port),
82
        format_message_for_log(message),
83
    )
84
85 1
    syslog_message_received.send(
86
        port, source_address=client_address, message=message
87
    )
88
89
90 1
def create_server(port: Port) -> Union[ThreadingTCPServer, ThreadingUDPServer]:
91
    """Create a threading server to receive syslog messages."""
92
    address = ('', port.number)
93
94
    if port.transport_protocol == TransportProtocol.TCP:
95
        tcp_handler_class = partial(TCPHandler, port)
96
        return ThreadingTCPServer(address, tcp_handler_class)
97
    elif port.transport_protocol == TransportProtocol.UDP:
98
        udp_handler_class = partial(UDPHandler, port)
99
        return ThreadingUDPServer(address, udp_handler_class)
100
    else:
101
        raise ValueError(f'Unsupported transport protocol')
102
103
104 1
def start_server(port: Port) -> None:
105
    """Start a server, in a separate thread."""
106
    try:
107
        server = create_server(port)
108
    except OSError as e:
109
        sys.stderr.write(f'Error {e.errno:d}: {e.strerror}\n')
110
        sys.stderr.write(
111
            f'Cannot open port {format_port(port)}. Could be already in use. '
112
            f'Or permission is lacking; try a port number above 1,024 (or '
113
            'even 4,096) and up to 65,535.\n'
114
        )
115
        sys.exit(1)
116
117
    thread_name = f'{server.__class__.__name__}-port{port}'
118
    start_thread(server.serve_forever, thread_name)
119
    logger.info(
120
        'Listening for syslog messages on %s:%s.',
121
        server.server_address[0],
122
        format_port(port),
123
    )
124
125
126 1
def start_syslog_message_receivers(ports: Iterable[Port]) -> None:
127
    """Start one syslog message receiving server for each port."""
128
    for port in ports:
129
        start_server(port)
130
131
132 1
def format_message_for_log(message: SyslogMessage) -> str:
133
    """Format a syslog message to be logged."""
134 1
    return (
135
        f'facility={message.facility.name}, '
136
        f'severity={message.severity.name}, '
137
        f'timestamp={message.timestamp.isoformat()}, '
138
        f'hostname={message.hostname}, '
139
        f'message={message.message}'
140
    )
141