Completed
Pull Request — master (#199)
by Björn
03:00
created

get_logging_level()   B

Complexity

Conditions 6

Size

Total Lines 26

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 6
dl 0
loc 26
rs 7.5384
1
#!/usr/bin/env python
2
3
"""\
4
Logging Statement Modifier - replace logging calls with pass (or vice versa)
5
Author: David Underhill <[email protected]>
6
Version: 1.00 (06-Feb-2010)
7
8
This script parses a Python file and comments out logging statements, replacing
9
them with a pass statement (or vice versa).  The purpose of commenting out these
10
statements is to improve performance.  Even if logging is disabled, arguments to
11
logging method calls must still be evaluated, which can be expensive.
12
13
This tool handles most common cases:
14
  * Log statements may span multiple lines.
15
  * Custom logging levels may be added (LEVELS, LEVEL_VALUES).
16
  * Integral logging levels & named logging levels (DEBUG, etc.) are recognized.
17
  * Logging statements log(), debug(), ..., critical() are all recognized.
18
  * Statements with unrecognized logging levels will be left as-is.
19
  * 'logging' is the assumed logging module name (LOGGING_MODULE_NAME).
20
21
However, its ability to parse files is limited:
22
  * It only operates on logging statements in the form logging.log(<level>, ...)
23
    and logging.<level>(...).
24
  * The <level> must either be an integral constant or contain one of the names
25
    from the LEVELS constant below.
26
  * If a logging statement is made, it is assumed that no other statement is
27
    made on the same line as logging statement (except for statements made in
28
    between the open and close parenthesis of the logging call).  For example,
29
    a semi-colon and then a second statement on the same line as a logging call
30
    will not be handled properly.
31
  * Logging methods must be called through SOME module, e.g., logging.log(), not
32
    just log().
33
  * For simplicity, undoing the commenting process relies on a comment left by
34
    the program on the pass statements it adds when commenting out logging
35
    statements.  (So don't change the comment it outputs by the pass statement).
36
37
To run this command on all of the Python files in a particular folder and its
38
sub-folders at once, try this (replace '/path/to' as appropriate):
39
    find . -name '*.py' | xargs -i{} /path/to/logging_statement_modifier.py {}
40
"""
41
42
import logging
43
from optparse import OptionParser
44
import re
45
import sys
46
47
# logging level names and values
48
LEVELS = ['DEBUG', 'INFO', 'WARN', 'WARNING', 'ERROR', 'CRITICAL']
49
LEVEL_VALUES = [logging.DEBUG, logging.INFO, logging.WARN, logging.WARNING, logging.ERROR, logging.CRITICAL]
50
LEVELS_DICT = dict(zip(LEVELS, LEVEL_VALUES))
51
52
# names of methods in the logging module which perform logging
53
LOGGING_METHODS_OF_INTEREST = ['log', 'debug', 'info', 'warn', 'warning', 'error', 'critical']
54
55
# name of the logging module
56
LOGGING_MODULE_NAME = 'logging'
57
58
# this matches logging.<method>([<first_arg>,]
59
# STR_RE_LOGGING_CALL = r'%s.(\w+)[(](([^,\r\n]+),)?' % LOGGING_MODULE_NAME
60
STR_RE_LOGGING_CALL = r'\b(' + '|'.join(LOGGING_METHODS_OF_INTEREST) + r')[(](([^,\r\n]+),)?'
61
62
# contents of a pass line (not including prefixed whitespace)
63
PASS_LINE_CONTENTS = 'pass # replaces next logging statement\n'
64
65
# Match a logging call (must only be prefixed with whitespace).  Capture groups
66
# include the whitespace, the logging method called, and the first argument if
67
# possible
68
RE_LOGGING_START = re.compile(r'^(\s+)' + STR_RE_LOGGING_CALL)
69
RE_LOGGING_START_IN_COMMENT = re.compile(r'^(\s+)#' + STR_RE_LOGGING_CALL)
70
71
def main(argv=sys.argv[1:]):
72
    """Parses the command line comments."""
73
    usage = 'usage: %prog [options] FILE\n\n' + __doc__
74
    parser = OptionParser(usage)
75
76
    # options
77
    parser.add_option("-f", "--force",
78
                      action='store_true', default=False,
79
                      help="make changes even if they cannot undone before saving the new file")
80
    parser.add_option("-m", "--min_level",
81
                      default='NONE',
82
                      help="minimum level of logging statements to modify [default: no minimum]")
83
    parser.add_option("-M", "--max_level",
84
                      default='NONE',
85
                      help="maximum level of logging statements to modify [default: no maximum]")
86
    parser.add_option("-o", "--output-file",
87
                      default=None,
88
                      help="where to output the result [default: overwrite the input file]")
89
    parser.add_option("-r", "--restore",
90
                      action='store_true', default=False,
91
                      help="restore logging statements previously commented out and replaced with pass statements")
92
    parser.add_option("-v", "--verbose",
93
                      action='store_true', default=False,
94
                      help="print informational messages about changes made")
95
96
    (options, args) = parser.parse_args(argv)
97
    if len(args) != 1:
98
        parser.error("expected 1 argument but got %d arguments: %s" % (len(args), ' '.join(args)))
99
    input_fn = args[0]
100
    if not options.output_file:
101
        options.output_file = input_fn
102
103
    # validate min/max level
104
    LEVEL_CHOICES = LEVELS + ['NONE']
105
    min_level_value = 0 if options.min_level == 'NONE' else get_level_value(options.min_level)
106
    if options.min_level is None:
107
        parser.error("min level must be an integer or one of these values: %s" % ', '.join(LEVEL_CHOICES))
108
    max_level_value = sys.maxint if options.max_level == 'NONE' else get_level_value(options.max_level)
109
    if options.max_level is None:
110
        parser.error("max level must be an integer or one of these values: %s" % ', '.join(LEVEL_CHOICES))
111
112
    if options.verbose:
113
        logging.getLogger().setLevel(logging.INFO)
114
115
    try:
116
        return modify_logging(input_fn, options.output_file,
117
                              min_level_value, max_level_value,
118
                              options.restore, options.force)
119
    except IOError as e:
120
        logging.error(str(e))
121
        return -1
122
123
# matches two main groups: 1) leading whitespace and 2) all following text
124
RE_LINE_SPLITTER_COMMENT = re.compile(r'^(\s*)((.|\n)*)$')
125
def comment_lines(lines):
126
    """Comment out the given list of lines and return them.  The hash mark will
127
    be inserted before the first non-whitespace character on each line."""
128
    ret = []
129
    for line in lines:
130
        ws_prefix, rest, ignore = RE_LINE_SPLITTER_COMMENT.match(line).groups()
131
        ret.append(ws_prefix + '#' + rest)
132
    return ''.join(ret)
133
134
# matches two main groups: 1) leading whitespace and 2) all following text
135
RE_LINE_SPLITTER_UNCOMMENT = re.compile(r'^(\s*)#((.|\n)*)$')
136
def uncomment_lines(lines):
137
    """Uncomment the given list of lines and return them.  The first hash mark
138
    following any amount of whitespace will be removed on each line."""
139
    ret = []
140
    for line in lines:
141
        ws_prefix, rest, ignore = RE_LINE_SPLITTER_UNCOMMENT.match(line).groups()
142
        ret.append(ws_prefix + rest)
143
    return ''.join(ret)
144
145
def first_arg_to_level_name(arg):
146
    """Decide what level the argument specifies and return it.  The argument
147
    must contain (case-insensitive) one of the values in LEVELS or be an integer
148
    constant.  Otherwise None will be returned."""
149
    try:
150
        return int(arg)
151
    except ValueError:
152
        arg = arg.upper()
153
        for level in LEVELS:
154
            if level in arg:
155
                return level
156
        return None
157
158
def get_level_value(level):
159
    """Returns the logging value associated with a particular level name.  The
160
    argument must be present in LEVELS_DICT or be an integer constant.
161
    Otherwise None will be returned."""
162
    try:
163
        # integral constants also work: they are the level value
164
        return int(level)
165
    except ValueError:
166
        try:
167
            return LEVELS_DICT[level.upper()]
168
        except KeyError:
169
            logging.warning("level '%s' cannot be translated to a level value (not present in LEVELS_DICT)" % level)
170
            return None
171
172
def get_logging_level(logging_stmt, commented_out=False):
173
    """Determines the level of logging in a given logging statement.  The string
174
    representing this level is returned.  False is returned if the method is
175
    not a logging statement and thus has no level.  None is returned if a level
176
    should have been found but wasn't."""
177
    regexp = RE_LOGGING_START_IN_COMMENT if commented_out else RE_LOGGING_START
178
    ret = regexp.match(logging_stmt)
179
    _, method_name, _, first_arg = ret.groups()
180
    if method_name not in LOGGING_METHODS_OF_INTEREST:
181
        logging.debug('skipping uninteresting logging call: %s' % method_name)
182
        return False
183
184
    if method_name != 'log':
185
        return method_name
186
187
    # if the method name did not specify the level, we must have a first_arg to extract the level from
188
    if not first_arg:
189
        logging.warning("logging.log statement found but we couldn't extract the first argument")
190
        return None
191
192
    # extract the level of logging from the first argument to the log() call
193
    level = first_arg_to_level_name(first_arg)
194
    if level is None:
195
        logging.warning("arg does not contain any known level '%s'\n" % first_arg)
196
        return None
197
    return level
198
199
def level_is_between(level, min_level_value, max_level_value):
200
    """Returns True if level is between the specified min or max, inclusive."""
201
    level_value = get_level_value(level)
202
    if level_value is None:
203
        # unknown level value
204
        return False
205
    return level_value >= min_level_value and level_value <= max_level_value
206
207
def split_call(lines, open_paren_line=0):
208
    """Returns a 2-tuple where the first element is the list of lines from the
209
    first open paren in lines to the matching closed paren.  The second element
210
    is all remaining lines in a list."""
211
    num_open = 0
212
    num_closed = 0
213
    for i, line in enumerate(lines):
214
        c = line.count('(')
215
        num_open += c
216
        if not c and i==open_paren_line:
217
            raise Exception('Exception open parenthesis in line %d but there is not one there: %s' % (i, str(lines)))
218
        num_closed += line.count(')')
219
220
        if num_open == num_closed:
221
            return (lines[:i+1], lines[i+1:])
222
223
    print ''.join(lines)
224
    raise Exception('parenthesis are mismatched (%d open, %d closed found)' % (num_open, num_closed))
225
226
def modify_logging(input_fn, output_fn, min_level_value, max_level_value, restore, force):
227
    """Modifies logging statements in the specified file."""
228
    # read in all the lines
229
    logging.info('reading in %s' % input_fn)
230
    fh = open(input_fn, 'r')
231
    lines = fh.readlines()
232
    fh.close()
233
    original_contents = ''.join(lines)
234
235
    if restore:
236
        forwards = restore_logging
237
        backwards = disable_logging
238
    else:
239
        forwards = disable_logging
240
        backwards = restore_logging
241
242
    # apply the requested action
243
    new_contents = forwards(lines, min_level_value, max_level_value)
244
245
    # quietly check to see if we can undo what we just did (if not, the text
246
    # contains something we cannot translate [bug or limitation with this code])
247
    logging.disable(logging.CRITICAL)
248
    new_contents_undone = backwards(new_contents.splitlines(True), min_level_value, max_level_value)
249
    logging.disable(logging.DEBUG)
250
    if original_contents != new_contents_undone:
251
        base_str = 'We are unable to revert this action as expected'
252
        if force:
253
            logging.warning(base_str + " but -f was specified so we'll do it anyway.")
254
        else:
255
            logging.error(base_str + ', so we will not do it in the first place.  Pass -f to override this and make the change anyway.')
256
            return -1
257
258
    logging.info('writing the new contents to %s' % output_fn)
259
    fh = open(output_fn, 'w')
260
    fh.write(new_contents)
261
    fh.close()
262
    logging.info('done!')
263
    return 0
264
265
def check_level(logging_stmt, logging_stmt_is_commented_out, min_level_value, max_level_value):
266
    """Extracts the level of the logging statement and returns True if the
267
    level falls betwen min and max_level_value.  If the level cannot be
268
    extracted, then a warning is logged."""
269
    level = get_logging_level(logging_stmt, logging_stmt_is_commented_out)
270
    if level is None:
271
        logging.warning('skipping logging statement because the level could not be extracted: %s' % logging_stmt.strip())
272
        return False
273
    elif level is False:
274
        return False
275
    elif level_is_between(level, min_level_value, max_level_value):
276
        return True
277
    else:
278
        logging.debug('keep this one as is (not in the specified level range): %s' % logging_stmt.strip())
279
        return False
280
281
def disable_logging(lines, min_level_value, max_level_value):
282
    """Disables logging statements in these lines whose logging level falls
283
    between the specified minimum and maximum levels."""
284
    output = ''
285
    while lines:
286
        line = lines[0]
287
        ret = RE_LOGGING_START.match(line)
288
        if not ret:
289
            # no logging statement here, so just leave the line as-is and keep going
290
            output += line
291
            lines = lines[1:]
292
        else:
293
            # a logging call has started: find all the lines it includes and those it does not
294
            logging_lines, remaining_lines = split_call(lines)
295
            lines = remaining_lines
296
            logging_stmt = ''.join(logging_lines)
297
298
            # replace the logging statement if its level falls b/w min and max
299
            if not check_level(logging_stmt, False, min_level_value, max_level_value):
300
                output += logging_stmt
301
            else:
302
                # comment out this logging statement and replace it with pass
303
                prefix_ws = ret.group(1)
304
                pass_stmt = prefix_ws + PASS_LINE_CONTENTS
305
                commented_out_logging_lines = comment_lines(logging_lines)
306
                new_lines = pass_stmt + commented_out_logging_lines
307
                logging.info('replacing:\n%s\nwith this:\n%s' % (logging_stmt.rstrip(), new_lines.rstrip()))
308
                output += new_lines
309
    return output
310
311
def restore_logging(lines, min_level_value, max_level_value):
312
    """Re-enables logging statements in these lines whose logging level falls
313
    between the specified minimum and maximum levels and which were disabled
314
    by disable_logging() before."""
315
    output = ''
316
    while lines:
317
        line = lines[0]
318
        if line.lstrip() != PASS_LINE_CONTENTS:
319
            # not our pass statement here, so just leave the line as-is and keep going
320
            output += line
321
            lines = lines[1:]
322
        else:
323
            # a logging call will start on the next line: find all the lines it includes and those it does not
324
            logging_lines, remaining_lines = split_call(lines[1:])
325
            lines = remaining_lines
326
            logging_stmt = ''.join(logging_lines)
327
            original_lines = line + logging_stmt
328
329
            # replace the logging statement if its level falls b/w min and max
330
            if not check_level(logging_stmt, True, min_level_value, max_level_value):
331
                output += logging_stmt
332
            else:
333
                # uncomment_lines of this logging statement and remove the pass line
334
                uncommented_logging_lines = uncomment_lines(logging_lines)
335
                logging.info('replacing:\n%s\nwith this:\n%s' % (original_lines.rstrip(), uncommented_logging_lines.rstrip()))
336
                output += uncommented_logging_lines
337
    return output
338
339
if __name__ == "__main__":
340
    logging.basicConfig(format='%(levelname)s: %(message)s', level=logging.WARN)
341
    sys.exit(main())
342