_reduce_logging_level()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 3
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 3
nop 0
dl 0
loc 3
rs 10
c 0
b 0
f 0
1
# Copyright 2014 Diamond Light Source Ltd.
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#     http://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14
15
"""
16
.. module:: savu_config
17
   :platform: Unix
18
   :synopsis: A command line tool for creating Savu plugin lists
19
20
.. moduleauthor:: Nicola Wadeson <[email protected]>
21
22
"""
23
24
import re
25
import sys
26
import logging
27
from pathlib import Path
28
from . import hdf_utils as hu
29
30
from colorama import Fore
31
32
logger = logging.getLogger('documentationLog')
33
logger_rst = logging.getLogger('documentationRst')
34
35
import warnings
36
with warnings.catch_warnings():
37
    warnings.simplefilter("ignore")
38
    from .content import Content
39
    from .completer import Completer
40
    from . import display_formatter
41
    from .display_formatter import ListDisplay, DispDisplay, \
42
        CiteDisplay
43
    from . import arg_parsers as parsers
44
    from savu.plugins import utils as pu
45
    from . import config_utils as utils
46
    from .config_utils import parse_args
47
    from .config_utils import error_catcher
48
49
50
def _help(content, args):
51
    """ Display the help information"""
52
    width = display_formatter.get_terminal_width()
53
    command_title = " Savu configurator commands "
54
    title_separator = "*"*((width-len(command_title))//2)
55
    print(f"{title_separator}{command_title}{title_separator}")
56
    for key in sorted(list(commands.keys()),  key=str.lower):
57
        doc = commands[key].__doc__
58
        if doc:
59
            print(Fore.GREEN + f"{key:>8}" + Fore.RESET
60
                  + f" : {commands[key].__doc__}")
61
    line_separator = "*" * width
62
    info_text = "* For more information about individual commands type " \
63
                "'<command> -h' *"
64
    title_separator = " "*((width-len(info_text))//2)
65
    print(f"\n{line_separator}")
66
    print(f"{title_separator}{info_text}{title_separator}")
67
    print(line_separator)
68
    return content
69
70
71
@parse_args
72
@error_catcher
73
def _open(content, args):
74
    """ Open an existing process list."""
75
    content.fopen(args.file, update=True, skip=args.skip)
76
    _disp(content, '-q')
77
    _ref(content, '* -n')
78
    return content
79
80
81
@parse_args
82
@error_catcher
83
def _disp(content, args):
84
    """ Display the plugins in the current list."""
85
    range_dict = content.split_plugin_string(args.start, args.stop,
86
                                             subelem_view=True)
87
    formatter = DispDisplay(content.plugin_list)
88
    verbosity = parsers._get_verbosity(args)
89
    level = 'advanced' if args.all else content.disp_level
90
    datasets = True if args.datasets else False
91
    content.display(formatter, current_level=level, verbose=verbosity,
92
                    datasets=datasets, disp_level=args.level, **range_dict)
93
    return content
94
95
96
@parse_args
97
@error_catcher
98
def _list(content, args):
99
    """ List the available plugins. """
100
    list_content = Content()
101
    # Instead of reloading all plugins, copy the list of failed plugins
102
    list_content.failed = content.failed
103
    utils._populate_plugin_list(list_content, pfilter=args.string)
104
    if not len(list_content.plugin_list.plugin_list):
105
        print("No result found.")
106
        return content
107
108
    # Sort the list of dictionaries.
109
    # Sort by the name of the plugin within each dictionary.
110
    list_content.plugin_list.plugin_list = \
111
        sorted(list_content.plugin_list.plugin_list,
112
                key=lambda i: i['name'])
113
114
    formatter = ListDisplay(list_content.plugin_list)
115
    verbosity = parsers._get_verbosity(args)
116
    list_content.display(formatter, verbose=verbosity)
117
    return content
118
119
120
@parse_args
121
@error_catcher
122
def _save(content, args):
123
    """ Save the current process list to file."""
124
    out_file = content.filename if args.input else args.filepath
125
    content.check_file(out_file)
126
    print()
127
    DispDisplay(content.plugin_list)._notices()
128
    content.save(out_file, check=input("Are you sure you want to save the "
129
                 "current data to %s [y/N]" % (out_file)),
130
                 template=args.template)
131
    return content
132
133
134
@parse_args
135
@error_catcher
136
def _mod(content, args):
137
    """ Modify plugin parameters."""
138
    if args.globalpar:
139
        # deal with the global parameter
140
        command = 'global'
141
    else:
142
        pos_str, subelem, dims, command = \
143
            content.separate_plugin_subelem(args.param, False)
144
    if 'expand' in command:
145
        # Run the start stop step view for that dimension alone
146
        _expand(content, f"{pos_str} {dims} {True}")
147
    if 'global' in command:
148
        pos_list = content.get_positions()
149
        modified_list = False
150
        for pos_no in pos_list:
151
            valid_modification = content.modify_global(int(pos_no), args)
152
            if valid_modification:
153
                modified_list = True
154
        if modified_list:
155
            print("Parameter '{}' has been changed globally to '{}'".format(args.param, args.value[0]))
156
        else:
157
            print("Parameter '{}' has not been found in the process list".format(args.param))
158
    else:
159
        if not args.default:
160
            content.check_required_args(args.value, True)
161
        # Get the parameter name for the display later
162
        args.param = content.get_param_arg_str(pos_str, subelem)
0 ignored issues
show
introduced by
The variable subelem does not seem to be defined for all execution paths.
Loading history...
introduced by
The variable pos_str does not seem to be defined for all execution paths.
Loading history...
163
        content_modified = content.modify(pos_str, subelem,
164
                                          ' '.join(args.value),
165
                                          default=args.default,
166
                                          dim=command)
167
        if content_modified:
168
            # Display the selected parameter only
169
            _disp(content, str(args.param))
170
    return content
171
172
@parse_args
173
@error_catcher
174
def _expand(content, args):
175
    """ Expand the plugin preview parameter. """
176
    content.set_preview_display(args.off, args.dim, args.dim_view,
177
                                args.plugin_pos)
178
    if content.expand_dim is not None:
179
        if content.expand_dim == "all" and args.dim is not None:
180
            check_str = f"Are you sure you want to alter the number of "\
181
                        f"dimensions to {args.dim}? [y/N]"
182
            is_modified = content.modify_dimensions(args.plugin_pos, args.dim,
183
                                                    check=input(check_str))
184
            if is_modified:
185
                _disp(content, f"{args.plugin_pos}.preview")
186
        else:
187
            _disp(content, f"{args.plugin_pos}.preview")
188
            #content.set_preview_display(args.off, "all")
189
    return content
190
191
192
@parse_args
193
@error_catcher
194
def _set(content, args):
195
    """ Set the status of the plugin to be on or off. """
196
    for pos in args.plugin_pos:
197
        content.on_and_off(pos, args.status.upper())
198
    _disp(content, '-q')
199
    return content
200
201
202
@parse_args
203
@error_catcher
204
def _add(content, args):
205
    """ Add a plugin to the list. """
206
    elems = content.get_positions()
207
    final = str(int(re.findall(r'\d+', elems[-1])[0])+1) if elems else 1
208
    content.add(args.name, args.pos if args.pos else str(final))
209
    content.check_iterative_loops([int(args.pos)] if args.pos else [int(final)], 1)
210
    _disp(content, '-q')
211
    return content
212
213
214
@parse_args
215
@error_catcher
216
def _iterate(content, args):
217
    """ Set a plugin (or group of plugins) to run iteratively. """
218
    # TODO: note the lack of use of _disp(); maybe will need this for
219
    # visually displaying the iterative loops in the terminal window?
220
    if args.remove is None and args.set is None:
221
        # display only the loops, not the rest of the process list
222
        content.display_iterative_loops()
223
    else:
224
        content.iterate(args)
225
        # display the process list with the visual markers of where iterative
226
        # loops are
227
        _disp(content, '-q')
228
    return content
229
    # TODO: the commented-out code below is associated with the TODO in
230
    # arg_parsers._iterate_arg_parser()
231
#    plugin_indices = args.indices
232
#    iterations = args.iterations
233
#
234
#    start = plugin_indices[0]
235
#    if len(plugin_indices) == 1:
236
#        # only the start index has been given, so there is only one element in
237
#        # the list
238
#        end = plugin_indices[0]
239
#    else:
240
#        # both the start and end index has been given, so there are two elements
241
#        # in the list
242
#        end = plugin_indices[1]
243
#
244
#    content.add_iterate_plugin_group(start, end, iterations[0])
245
246
247
@parse_args
248
@error_catcher
249
def _dupl(content, args):
250
    """ Duplicate a plugin in the list"""
251
    elems = content.get_positions()
252
    final = str(int(re.findall(r'\d+', elems[-1])[0])+1) if elems else 1
253
    content.duplicate(args.orig_pos, args.new_pos if args.new_pos else str(final))
254
    _disp(content, '-q')
255
    return content
256
257
258
@parse_args
259
@error_catcher
260
def _ref(content, args):
261
    """ Refresh a plugin (update it). """
262
    positions = content.get_positions() if args.pos == ['*'] else args.pos
263
    for pos_str in positions:
264
        content.refresh(pos_str, defaults=args.defaults)
265
        if not args.nodisp:
266
            _disp(content, pos_str)
267
    return content
268
269
270
@parse_args
271
@error_catcher
272
def _cite(content, args):
273
    """ Display plugin citations."""
274
    range_dict = content.split_plugin_string(args.start, args.stop)
275
    formatter = CiteDisplay(content.plugin_list)
276
    content.display(formatter, **range_dict)
277
    return content
278
279
280
@parse_args
281
@error_catcher
282
def _rem(content, args):
283
    """ Remove plugin(s) from the list. """
284
    pos_sort = []
285
    for pos in args.pos:
286
        pos_sort.append(int(pos))
287
    pos_sort.sort()
288
    counter = 0
289
    for pos in pos_sort:
290
        if ((counter > 0 and pos > 0)):
291
            pos -= counter
292
        content.remove(str(pos))
293
        counter += 1
294
        content.check_iterative_loops([pos], -1)
295
    _disp(content, '-q')
296
    return content
297
298
299
@parse_args
300
@error_catcher
301
def _move(content, args):
302
    """ Move a plugin to a different position in the list."""
303
    content.move(args.orig_pos, args.new_pos)
304
    _disp(content, '-q')
305
    return content
306
307
308
@parse_args
309
@error_catcher
310
def _coll(content, arg):
311
    """ List all plugin collections. """
312
    colls = Completer([])._get_collections()
313
    print('\n')
314
    for c in colls:
315
        print('%s\n %s' % ('-'*40, c))
316
    print('-'*40, '\n')
317
    return content
318
319
@parse_args
320
@error_catcher
321
def _clear(content, arg):
322
    """ Clear the current plugin list."""
323
    content.clear(check=input("Are you sure you want to clear the current "
324
                  "plugin list? [y/N]"))
325
    return content
326
327
@parse_args
328
@error_catcher
329
def _exit(content, arg):
330
    """ Close the program."""
331
    content.set_finished(check=input("Are you sure? [y/N]"))
332
    return content
333
334
@parse_args
335
@error_catcher
336
def _level(content, args):
337
    """ Set a visibility level for the parameters."""
338
    content.level(args.level)
339
    return content
340
341
@parse_args
342
@error_catcher
343
def _history(content, arg):
344
    """ View the history of previous commands """
345
    hlen = utils.readline.get_current_history_length()
346
    for i in range(hlen):
347
        print("%5i : %s" % (i, utils.readline.get_history_item(i)))
348
    return content
349
350
@parse_args
351
@error_catcher
352
def _replace(content, args):
353
    """ Replace a plugin with another """
354
    content.replace(args.old, args.new_plugin)
355
    _disp(content, '-q')
356
    return content
357
358
commands = {'open': _open,
359
            'help': _help,
360
            'disp': _disp,
361
            'list': _list,
362
            'save': _save,
363
            'mod': _mod,
364
            'set': _set,
365
            'add': _add,
366
            'dupl': _dupl,
367
            'rem': _rem,
368
            'move': _move,
369
            'ref': _ref,
370
            'level': _level,
371
            'expand': _expand,
372
            'cite': _cite,
373
            'coll': _coll,
374
            'clear': _clear,
375
            'exit': _exit,
376
            'history': _history,
377
            'iterate': _iterate,
378
            'replace': _replace}
379
380
def get_description():
381
    """ For each command, enter the function and save the docstring to a
382
    dictionary
383
    """
384
    command_desc_dict = {command: function_name.__doc__
385
                         for command, function_name in commands.items()}
386
    return command_desc_dict
387
388
389
def main(test=False):
390
    """
391
    :param test: If test is True the last argument from sys.argv is removed,
392
                 as it contains the directory of the test scripts, which fails the
393
                 parsing of the arguments as it has an unexpected argument.
394
395
                 If test is False then nothing is touched.
396
    """
397
398
    # required for running the tests locally or on travis
399
    # drops the last argument from pytest which is the test file/module
400
    if test:
401
        try:
402
            # find where the /scripts argument is
403
            index_of_scripts_argument = ["scripts" in arg for arg in sys.argv].index(True)
404
            # remove it, including every arguments after it (e.g --cov)
405
            sys.argv = sys.argv[:index_of_scripts_argument]
406
        except ValueError:
407
            # scripts was not part of the arguments passed in by the test
408
            pass
409
410
    args = parsers._config_arg_parser(doc=False)
411
    if args.tree is not None:
412
        file_path = args.tree
413
        hu.get_hdf_tree(file_path, add_shape=True, display=True)
414
        sys.exit(0)
415
    if args.find is not None:
416
        vals = args.find
417
        file_path = vals[0][0]
418
        pattern = vals[0][1]
419
        hu.find_hdf_key(file_path, pattern, display=True)
420
        sys.exit(0)
421
    if args.check is not None:
422
        file_path = args.check
423
        hu.check_tomo_data(file_path)
424
        sys.exit(0)
425
    if args.name is not None:
426
        args.file = args.name
427
428
    if args.error:
429
        utils.error_level = 1
430
431
    print("Running the configurator")
432
    print("Starting Savu Config tool (please wait for prompt)")
433
434
    _reduce_logging_level()
435
436
    content = Content(level="advanced" if args.disp_all else 'basic')
437
438
    with warnings.catch_warnings():
439
        warnings.simplefilter("ignore")
440
        # imports all the (working) plugin modules
441
        content.failed = utils.populate_plugins(error_mode=args.error,
442
                                                examples=args.examples)
443
444
    comp = Completer(commands=commands, plugin_list=pu.plugins)
445
    utils._set_readline(comp.complete)
446
447
448
    # if file flag is passed then open it here
449
    if args.file:
450
        commands['open'](content, args.file)
451
452
    print("\n*** Press Enter for a list of available commands. ***\n")
453
454
    utils.load_history_file(utils.histfile)
455
    accumulative_output = ''
456
    while True:
457
        try:
458
            name = f"{Path(content.filename).stem} " if content.filename else ""
459
            in_text = input(f"{name}>>> ").strip()
460
            in_list = in_text.split(' ', 1)
461
            _write_command_to_log(in_text)
462
463
        except KeyboardInterrupt:
464
            print()
465
            continue
466
        except EOFError:
467
            # makes possible exiting on CTRL + D (EOF, like Python interpreter)
468
            break
469
470
        command, arg = in_list if len(in_list) == 2 else in_list+['']
471
        command = command if command else 'help'
472
        if command not in commands:
473
            print("I'm sorry, that's not a command I recognise. Press Enter "
474
                  "for a list of available commands.")
475
        else:
476
            content = commands[command](content, arg)
477
            if logger.handlers:
478
                # Write the command output to a logger
479
                accumulative_output = _write_output_to_log(accumulative_output)
480
481
        if content.is_finished():
482
            break
483
484
    print("Thanks for using the application")
485
486
487
def _write_command_to_log(in_text):
488
    logger.debug("TEST COMMAND: " + in_text)
489
    logger_rst.debug(f".. dropdown:: >>> {in_text}\n\n" \
490
                     f"    .. code-block:: bash \n")
491
492
493
def _write_output_to_log(accumulative_output):
494
    """ Separate the current command output, format it correctly
495
    and write this to the restructured text log file.
496
497
    :param accumulative_output:
498
    :return: accumulative_output
499
    """
500
    current_output = sys.stdout.getvalue()
501
502
    # Find all previous command output
503
    length_Str = len(accumulative_output)
504
    if length_Str:
505
        # If there is previous output, then select only new lines from the
506
        # current command. This will demonstrate clearly the output of each
507
        # individual command
508
        current_output = current_output[length_Str:]
509
510
    # Characters used for command line colour
511
    unicode_chars = [u'\x1b[100m', u'\x1b[0m', u'\x1b[97m',
512
                     u'\x1b[49m', u'\x1b[46m', u'\x1b[39m',
513
                     u'\x1b[36m']
514
    # Remove unicode characters which cannot be displayed
515
    for code in unicode_chars:
516
        current_output = current_output.replace(code, '')
517
518
    # Indent the text for the rst file format
519
    indent_current_output = pu.indent_multi_line_str(current_output, 2)
520
    # Write to the rst log file
521
    logger_rst.debug(indent_current_output)
522
    return sys.stdout.getvalue()
523
524
525
def _reduce_logging_level():
526
    import logging
527
    logging.getLogger().setLevel(logging.CRITICAL)
528
529
if __name__ == '__main__':
530
    main()
531