Passed
Push — master ( 1b986a...ef0e11 )
by Vinicius
03:22 queued 14s
created

kytos.core.logs.LogManager.decorate_logger_class()   A

Complexity

Conditions 2

Size

Total Lines 19
Code Lines 12

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 12
CRAP Score 2

Importance

Changes 0
Metric Value
cc 2
eloc 12
nop 1
dl 0
loc 19
ccs 12
cts 12
cp 1
crap 2
rs 9.8
c 0
b 0
f 0
1
"""Handle logs displayed by Kytos SDN Platform."""
2 1
import inspect
3 1
import logging
4 1
import re
5 1
from configparser import RawConfigParser
6
# noqa so it does not conflict with grouped imports
7
# pylint: disable=ungrouped-imports
8 1
from logging import Formatter, config, getLogger
9
# pylint: enable=ungrouped-imports
10 1
from pathlib import Path
11
12 1
from kytos.core.logger_decorators import root_decorator
13 1
from kytos.core.websocket import WebSocketHandler
14
15 1
__all__ = ('LogManager', 'NAppLog')
16 1
LOG = getLogger(__name__)
17
18
19 1
class LogManager:
20
    """Manage handlers for all loggers."""
21
22 1
    _PARSER = RawConfigParser()
23 1
    _DEFAULT_FMT = 'formatter_console'
24
25 1
    @classmethod
26 1
    def load_config_file(cls, config_file, debug='False'):
27
        """Load log configuration file.
28
29
        Check whether file exists and if there's an OSError, try removing
30
        syslog handler.
31
32
        Args:
33
            config_file (:class:`str`, :class:`pathlib.Path`):
34
                Configuration file path.
35
        """
36 1
        if Path(config_file).exists():
37 1
            cls._PARSER.read(config_file)
38 1
            cls._set_debug_mode(debug)
39 1
            cls._use_config_file(config_file)
40
        else:
41 1
            LOG.warning('Log config file "%s" does not exist. Using default '
42
                        'Python logging configuration.',
43
                        config_file)
44
45 1
    @classmethod
46 1
    def _set_debug_mode(cls, debug=False):
47 1
        if debug is True:
48 1
            cls._PARSER.set('logger_root', 'level', 'DEBUG')
49 1
            cls._PARSER.set('logger_kytos', 'level', 'DEBUG')
50 1
            cls._PARSER.set('logger_api_server', 'level', 'DEBUG')
51 1
            LOG.info('Setting log configuration with debug mode.')
52
53 1
    @classmethod
54 1
    def _use_config_file(cls, config_file):
55
        """Use parsed logging configuration."""
56 1
        try:
57 1
            config.fileConfig(cls._PARSER, disable_existing_loggers=False)
58 1
            LOG.info('Logging config file "%s" loaded successfully.',
59
                     config_file)
60 1
        except OSError:
61 1
            cls._catch_config_file_exception(config_file)
62
63 1
    @classmethod
64 1
    def _catch_config_file_exception(cls, config_file):
65
        """Try not using syslog handler (for when it is not installed)."""
66 1
        if 'handler_syslog' in cls._PARSER:
67 1
            LOG.warning('Failed to load "%s". Trying to disable syslog '
68
                        'handler.', config_file)
69 1
            cls._PARSER.remove_section('handler_syslog')
70 1
            cls._use_config_file(config_file)
71
        else:
72 1
            LOG.warning('Failed to load "%s". Using default Python '
73
                        'logging configuration.', config_file)
74
75 1
    @classmethod
76 1
    def enable_websocket(cls, socket):
77
        """Output logs to a web socket.
78
79
        Args:
80
            socket: socketio's socket.
81
82
        Returns:
83
            logging.StreamHandler: Handler with the socket as stream.
84
85
        """
86 1
        handler = WebSocketHandler.get_handler(socket)
87 1
        cls.add_handler(handler)
88 1
        return handler
89
90 1
    @classmethod
91 1
    def add_handler(cls, handler):
92
        """Add handler to loggers.
93
94
        Use formatter_console if it exists.
95
96
        Args:
97
            handler (:mod:`logging.handlers`): Handle to be added.
98
        """
99 1
        if cls._PARSER.has_section(cls._DEFAULT_FMT):
100 1
            fmt_conf = cls._PARSER[cls._DEFAULT_FMT]
101 1
            fmt = Formatter(fmt_conf.get('format', None),
102
                            fmt_conf.get('datefmt', None))
103 1
            handler.setFormatter(fmt)
104 1
        handler.addFilter(cls.filter_session_disconnected)
105 1
        getLogger().addHandler(handler)
106
107 1
    @staticmethod
108 1
    def filter_session_disconnected(record):
109
        """Remove harmless session disconnected error.
110
111
        Despite this error, everything seems to be working. As we can't catch
112
        it anywhere, we filter it.
113
        """
114 1
        msg_end = "KeyError: 'Session is disconnected'"
115 1
        return not (record.name == 'werkzeug' and record.levelname == 'ERROR'
116
                    and record.args and isinstance(record.args[0], str)
117
                    and record.args[0].endswith(msg_end))
118
119 1
    @staticmethod
120 1
    def decorate_logger_class(*decorators):
121
        """Decorates the logger class with the given decorator.
122
        Assumes no loggers have been retrieved."""
123 1
        klass = logging.getLoggerClass()
124 1
        for decorator in decorators:
125 1
            klass = decorator(klass)
126
127 1
        old_root = logging.root
128
        # Update root
129 1
        new_root = root_decorator(klass)(old_root.level)
130 1
        logging.root = new_root
131 1
        logging.Logger.root = new_root
132
133
        # Update Manager
134
        # Force manager to forget all previous loggers
135 1
        new_manager = logging.Manager(new_root)
136 1
        new_manager.loggerClass = klass
137 1
        logging.Logger.manager = new_manager
138
139
140
# Add filter to all pre-existing handlers
141 1
HANDLER_FILTER = LogManager.filter_session_disconnected
142 1
for root_handler in logging.root.handlers:
143 1
    if HANDLER_FILTER not in root_handler.filters:
144 1
        root_handler.addFilter(HANDLER_FILTER)
145
146
147 1
class NAppLog:
148
    """High-level logger for NApp devs.
149
150
    From NApp dev's point of view:
151
    - No need for instantiation
152
    - Logger name is automatically assigned
153
154
    Redirect all calls to a logger with the correct name (NApp ID).
155
156
    The appropriate logger is a logging.Logger with NApp ID as its name. If no
157
    NApp is found, use the root logger.
158
159
    As any NApp can use this logger, its name is detected in every call by
160
    inspecting the caller's stack. If no NApp is found, use the root logger.
161
    """
162
163 1
    def __getattribute__(self, name):
164
        """Detect NApp ID and use its logger."""
165 1
        return get_napp_logger().__getattribute__(name)
166
167
168 1
def get_napp_logger():
169
    """Detect NApp ID and get its logger."""
170 1
    napp_id = _detect_napp_id()
171 1
    logger = getLogger("kytos.napps")
172
173
    # if napp_id is detected, get the napp logger.
174 1
    if napp_id:
175 1
        logger = logger.getChild(napp_id)
176
177 1
    return logger
178
179
180
#: Detect NApp ID from filename
181 1
NAPP_ID_RE = re.compile(r'.*napps/(.*?)/(.*?)/')
182
183
184 1
def _detect_napp_id():
185
    """Get the last called NApp in caller's stack.
186
187
    We use the last innermost NApp because a NApp *A* may call a NApp *B* and,
188
    when *B* uses the logger, the logger's name should be *B*.
189
190
    Returns:
191
        str, None: NApp ID or None if no NApp is found in the caller's stack.
192
193
    """
194 1
    for frame in inspect.stack():
195 1
        if not frame.filename == __file__:
196 1
            match = NAPP_ID_RE.match(frame.filename)
197 1
            if match:
198 1
                return '/'.join(match.groups())
199
    return None
200