Test Failed
Pull Request — master (#848)
by
unknown
03:43 queued 12s
created

_reduce_logging_level()   A

Complexity

Conditions 1

Size

Total Lines 3
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 3
nop 0
dl 0
loc 3
rs 10
c 0
b 0
f 0
1
# Copyright 2014 Diamond Light Source Ltd.
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#     http://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14
15
"""
16
.. module:: savu_config
17
   :platform: Unix
18
   :synopsis: A command line tool for creating Savu plugin lists
19
20
.. moduleauthor:: Nicola Wadeson <[email protected]>
21
22
"""
23
24
import re
25
import sys
26
import logging
27
28
logger = logging.getLogger('documentationLog')
29
logger_rst = logging.getLogger('documentationRst')
30
31
import warnings
32
with warnings.catch_warnings():
33
    warnings.simplefilter("ignore")
34
    from .content import Content
35
    from .completer import Completer
36
    from . import display_formatter
37
    from .display_formatter import ListDisplay, DispDisplay, \
38
        CiteDisplay
39
    from . import arg_parsers as parsers
40
    from savu.plugins import utils as pu
41
    from . import config_utils as utils
42
    from .config_utils import parse_args
43
    from .config_utils import error_catcher
44
45
46
def _help(content, args):
47
    """ Display the help information"""
48
    width = display_formatter.get_terminal_width()
49
    command_title = " Savu configurator commands "
50
    title_separator = "*"*((width-len(command_title))//2)
51
    print(f"{title_separator}{command_title}{title_separator}")
52
    for key in list(commands.keys()):
53
        doc = commands[key].__doc__
54
        if doc:
55
            print("%8s : %s" % (key, commands[key].__doc__))
56
    line_separator = "*" * width
57
    info_text = "* For more information about individual commands type " \
58
                "'<command> -h' *"
59
    title_separator = " "*((width-len(info_text))//2)
60
    print(f"\n{line_separator}")
61
    print(f"{title_separator}{info_text}{title_separator}")
62
    print(line_separator)
63
    return content
64
65
66
@parse_args
67
@error_catcher
68
def _open(content, args):
69
    """ Open an existing process list."""
70
    content.fopen(args.file, update=True, skip=args.skip)
71
    _ref(content, '* -n')
72
    _disp(content, '-q')
73
    return content
74
75
76
@parse_args
77
@error_catcher
78
def _disp(content, args):
79
    """ Display the plugins in the current list."""
80
    range_dict = content.split_plugin_string(args.start, args.stop,
81
                                             subelem_view=True)
82
    formatter = DispDisplay(content.plugin_list)
83
    verbosity = parsers._get_verbosity(args)
84
    level = 'advanced' if args.all else content.disp_level
85
    datasets = True if args.datasets else False
86
    content.display(formatter, current_level=level, verbose=verbosity,
87
                    datasets=datasets, disp_level=args.level, **range_dict)
88
    return content
89
90
91
@parse_args
92
@error_catcher
93
def _list(content, args):
94
    """ List the available plugins. """
95
    list_content = Content()
96
    # Instead of reloading all plugins, copy the list of failed plugins
97
    list_content.failed = content.failed
98
    utils._populate_plugin_list(list_content, pfilter=args.string)
99
    if not len(list_content.plugin_list.plugin_list):
100
        print("No result found.")
101
        return content
102
103
    # Sort the list of dictionaries.
104
    # Sort by the name of the plugin within each dictionary.
105
    list_content.plugin_list.plugin_list = \
106
        sorted(list_content.plugin_list.plugin_list,
107
                key=lambda i: i['name'])
108
109
    formatter = ListDisplay(list_content.plugin_list)
110
    verbosity = parsers._get_verbosity(args)
111
    list_content.display(formatter, verbose=verbosity)
112
    return content
113
114
115
@parse_args
116
@error_catcher
117
def _save(content, args):
118
    """ Save the current process list to file."""
119
    out_file = content.filename if args.input else args.filepath
120
    content.check_file(out_file)
121
    print()
122
    DispDisplay(content.plugin_list)._notices()
123
    content.save(out_file, check=input("Are you sure you want to save the "
124
                 "current data to %s [y/N]" % (out_file)),
125
                 template=args.template)
126
    return content
127
128
129
@parse_args
130
@error_catcher
131
def _mod(content, args):
132
    """ Modify plugin parameters."""
133
    pos_str, subelem, dims, command = \
134
        content.separate_plugin_subelem(args.param, False)
135
    if 'expand' in command:
136
        # Run the start stop step view for that dimension alone
137
        _expand(content, f"{pos_str} {dims} {True}")
138
    else:
139
        if not args.default:
140
            content.check_required_args(args.value, True)
141
        # Get the parameter name for the display later
142
        args.param = content.get_param_arg_str(pos_str, subelem)
143
        content_modified = content.modify(pos_str, subelem,
144
                                          ' '.join(args.value),
145
                                          default=args.default,
146
                                          dim=command)
147
        if content_modified:
148
            # Display the selected parameter only
149
            _disp(content, str(args.param))
150
    return content
151
152
153
@parse_args
154
@error_catcher
155
def _expand(content, args):
156
    """ Expand the plugin preview parameter. """
157
    content.set_preview_display(args.off, args.dim, args.dim_view,
158
                                args.plugin_pos)
159
    if content.expand_dim is not None:
160
        if content.expand_dim == "all" and args.dim is not None:
161
            check_str = f"Are you sure you want to alter the number of "\
162
                        f"dimensions to {args.dim}? [y/N]"
163
            is_modified = content.modify_dimensions(args.plugin_pos, args.dim,
164
                                                    check=input(check_str))
165
            if is_modified:
166
                _disp(content, f"{args.plugin_pos}.preview")
167
        else:
168
            _disp(content, f"{args.plugin_pos}.preview")
169
            #content.set_preview_display(args.off, "all")
170
    return content
171
172
173
@parse_args
174
@error_catcher
175
def _set(content, args):
176
    """ Set the status of the plugin to be on or off. """
177
    for pos in args.plugin_pos:
178
        content.on_and_off(pos, args.status.upper())
179
    _disp(content, '-q')
180
    return content
181
182
183
@parse_args
184
@error_catcher
185
def _add(content, args):
186
    """ Add a plugin to the list. """
187
    elems = content.get_positions()
188
    final = str(int(re.findall(r'\d+', elems[-1])[0])+1) if elems else 1
189
    content.add(args.name, args.pos if args.pos else str(final))
190
    _disp(content, '-q')
191
    return content
192
193
194
@parse_args
195
@error_catcher
196
def _dupl(content, args):
197
    """ Duplicate a plugin in the list"""
198
    elems = content.get_positions()
199
    final = str(int(re.findall(r'\d+', elems[-1])[0])+1) if elems else 1
200
    content.duplicate(args.orig_pos, args.new_pos if args.new_pos else str(final))
201
    _disp(content, '-q')
202
    return content
203
204
205
@parse_args
206
@error_catcher
207
def _ref(content, args):
208
    """ Refresh a plugin (update it). """
209
    positions = content.get_positions() if args.pos == ['*'] else args.pos
210
    for pos_str in positions:
211
        content.refresh(pos_str, defaults=args.defaults)
212
        if not args.nodisp:
213
            _disp(content, pos_str)
214
    return content
215
216
217
@parse_args
218
@error_catcher
219
def _cite(content, args):
220
    """ Display plugin citations."""
221
    range_dict = content.split_plugin_string(args.start, args.stop)
222
    formatter = CiteDisplay(content.plugin_list)
223
    content.display(formatter, **range_dict)
224
    return content
225
226
227
@parse_args
228
@error_catcher
229
def _rem(content, args):
230
    """ Remove plugin(s) from the list. """
231
    pos_sort = []
232
    for pos in args.pos:
233
        pos_sort.append(int(pos))
234
    pos_sort.sort()
235
    counter=0
236
    for pos in pos_sort:
237
        if ((counter>0 and pos>0)):
238
            pos-=counter
239
        content.remove(content.find_position(str(pos)))
240
        counter+=1
241
    _disp(content, '-q')
242
    return content
243
244
245
@parse_args
246
@error_catcher
247
def _move(content, args):
248
    """ Move a plugin to a different position in the list."""
249
    content.move(args.orig_pos, args.new_pos)
250
    _disp(content, '-q')
251
    return content
252
253
254
@parse_args
255
@error_catcher
256
def _coll(content, arg):
257
    """ List all plugin collections. """
258
    colls = Completer([])._get_collections()
259
    print('\n')
260
    for c in colls:
261
        print('%s\n %s' % ('-'*40, c))
262
    print('-'*40, '\n')
263
    return content
264
265
@parse_args
266
@error_catcher
267
def _clear(content, arg):
268
    """ Clear the current plugin list."""
269
    content.clear(check=input("Are you sure you want to clear the current "
270
                  "plugin list? [y/N]"))
271
    return content
272
273
@parse_args
274
@error_catcher
275
def _exit(content, arg):
276
    """ Close the program."""
277
    content.set_finished(check=input("Are you sure? [y/N]"))
278
    return content
279
280
@parse_args
281
@error_catcher
282
def _level(content, args):
283
    """ Set a visibility level for the parameters."""
284
    content.level(args.level)
285
    return content
286
287
@parse_args
288
@error_catcher
289
def _history(content, arg):
290
    """ View the history of previous commands """
291
    hlen = utils.readline.get_current_history_length()
292
    for i in range(hlen):
293
        print("%5i : %s" % (i, utils.readline.get_history_item(i)))
294
    return content
295
296
297
commands = {'open': _open,
298
            'help': _help,
299
            'disp': _disp,
300
            'list': _list,
301
            'save': _save,
302
            'mod': _mod,
303
            'set': _set,
304
            'add': _add,
305
            'dupl': _dupl,
306
            'rem': _rem,
307
            'move': _move,
308
            'ref': _ref,
309
            'level': _level,
310
            'expand': _expand,
311
            'cite': _cite,
312
            'coll': _coll,
313
            'clear': _clear,
314
            'exit': _exit,
315
            'history': _history}
316
317
def get_description():
318
    """ For each command, enter the function and save the docstring to a
319
    dictionary
320
    """
321
    command_desc_dict = {command: function_name.__doc__
322
                         for command, function_name in commands.items()}
323
    return command_desc_dict
324
325
def main(test=False):
326
    """
327
    :param test: If test is True the last argument from sys.argv is removed,
328
                 as it contains the directory of the test scripts, which fails the
329
                 parsing of the arguments as it has an unexpected argument.
330
331
                 If test is False then nothing is touched.
332
    """
333
334
    print("Running the configurator")
335
    # required for running the tests locally or on travis
336
    # drops the last argument from pytest which is the test file/module
337
    if test:
338
        try:
339
            # find where the /scripts argument is
340
            index_of_scripts_argument = ["scripts" in arg for arg in sys.argv].index(True)
341
            # remove it, including every arguments after it (e.g --cov)
342
            sys.argv = sys.argv[:index_of_scripts_argument]
343
        except ValueError:
344
            # scripts was not part of the arguments passed in by the test
345
            pass
346
347
    args = parsers._config_arg_parser(doc=False)
348
    if args.error:
349
        utils.error_level = 1
350
351
    print("Starting Savu Config tool (please wait for prompt)")
352
353
    _reduce_logging_level()
354
355
    content = Content(level="advanced" if args.disp_all else 'basic')
356
357
    with warnings.catch_warnings():
358
        warnings.simplefilter("ignore")
359
        # imports all the (working) plugin modules
360
        content.failed = utils.populate_plugins(error_mode=args.error,
361
                                                examples=args.examples)
362
363
    comp = Completer(commands=commands, plugin_list=pu.plugins)
364
    utils._set_readline(comp.complete)
365
366
367
    # if file flag is passed then open it here
368
    if args.file:
369
        commands['open'](content, args.file)
370
371
    print("\n*** Press Enter for a list of available commands. ***\n")
372
373
    utils.load_history_file(utils.histfile)
374
    accumulative_output = ''
375
    while True:
376
        try:
377
            in_text = input(">>> ").strip()
378
            in_list = in_text.split(' ', 1)
379
            _write_command_to_log(in_text)
380
381
        except KeyboardInterrupt:
382
            print()
383
            continue
384
        except EOFError:
385
            # makes possible exiting on CTRL + D (EOF, like Python interpreter)
386
            break
387
388
        command, arg = in_list if len(in_list) == 2 else in_list+['']
389
        command = command if command else 'help'
390
        if command not in commands:
391
            print("I'm sorry, that's not a command I recognise. Press Enter "
392
                  "for a list of available commands.")
393
        else:
394
            content = commands[command](content, arg)
395
            if logger.handlers:
396
                # Write the command output to a logger
397
                accumulative_output = _write_output_to_log(accumulative_output)
398
399
        if content.is_finished():
400
            break
401
402
    print("Thanks for using the application")
403
404
405
def _write_command_to_log(in_text):
406
    logger.debug("TEST COMMAND: " + in_text)
407
    logger_rst.debug(f".. dropdown:: >>> {in_text}\n\n" \
408
                     f"    .. code-block:: bash \n")
409
410
411
def _write_output_to_log(accumulative_output):
412
    """ Separate the current command output, format it correctly
413
    and write this to the restructured text log file.
414
415
    :param accumulative_output:
416
    :return: accumulative_output
417
    """
418
    current_output = sys.stdout.getvalue()
419
420
    # Find all previous command output
421
    length_Str = len(accumulative_output)
422
    if length_Str:
423
        # If there is previous output, then select only new lines from the
424
        # current command. This will demonstrate clearly the output of each
425
        # individual command
426
        current_output = current_output[length_Str:]
427
428
    # Characters used for command line colour
429
    unicode_chars = [u'\x1b[100m', u'\x1b[0m', u'\x1b[97m',
430
                     u'\x1b[49m', u'\x1b[46m', u'\x1b[39m',
431
                     u'\x1b[36m']
432
    # Remove unicode characters which cannot be displayed
433
    for code in unicode_chars:
434
        current_output = current_output.replace(code, '')
435
436
    # Indent the text for the rst file format
437
    indent_current_output = pu.indent_multi_line_str(current_output, 2)
438
    # Write to the rst log file
439
    logger_rst.debug(indent_current_output)
440
    return sys.stdout.getvalue()
441
442
443
def _reduce_logging_level():
444
    import logging
445
    logging.getLogger().setLevel(logging.CRITICAL)
446
447
if __name__ == '__main__':
448
    main()
449