Test Failed
Push — master ( 26b3f2...a95b01 )
by Daniil
01:45 queued 20s
created

scripts.config_generator.savu_config._open()   A

Complexity

Conditions 1

Size

Total Lines 8
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 7
nop 2
dl 0
loc 8
rs 10
c 0
b 0
f 0
1
# Copyright 2014 Diamond Light Source Ltd.
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#     http://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14
15
"""
16
.. module:: savu_config
17
   :platform: Unix
18
   :synopsis: A command line tool for creating Savu plugin lists
19
20
.. moduleauthor:: Nicola Wadeson <[email protected]>
21
22
"""
23
24
import re
25
import sys
26
import logging
27
from pathlib import Path
28
from . import hdf_utils as hu
29
30
from colorama import Fore
31
32
logger = logging.getLogger('documentationLog')
33
logger_rst = logging.getLogger('documentationRst')
34
35
import warnings
36
with warnings.catch_warnings():
37
    warnings.simplefilter("ignore")
38
    from .content import Content
39
    from .completer import Completer
40
    from . import display_formatter
41
    from .display_formatter import ListDisplay, DispDisplay, \
42
        CiteDisplay
43
    from . import arg_parsers as parsers
44
    from savu.plugins import utils as pu
45
    from . import config_utils as utils
46
    from .config_utils import parse_args
47
    from .config_utils import error_catcher
48
49
50
def _help(content, args):
51
    """ Display the help information"""
52
    width = display_formatter.get_terminal_width()
53
    command_title = " Savu configurator commands "
54
    title_separator = "*"*((width-len(command_title))//2)
55
    print(f"{title_separator}{command_title}{title_separator}")
56
    for key in sorted(list(commands.keys()),  key=str.lower):
57
        doc = commands[key].__doc__
58
        if doc:
59
            print(Fore.GREEN + f"{key:>8}" + Fore.RESET
60
                  + f" : {commands[key].__doc__}")
61
    line_separator = "*" * width
62
    info_text = "* For more information about individual commands type " \
63
                "'<command> -h' *"
64
    title_separator = " "*((width-len(info_text))//2)
65
    print(f"\n{line_separator}")
66
    print(f"{title_separator}{info_text}{title_separator}")
67
    print(line_separator)
68
    return content
69
70
71
@parse_args
72
@error_catcher
73
def _open(content, args):
74
    """ Open an existing process list."""
75
    content.fopen(args.file, update=True, skip=args.skip)
76
    _disp(content, '-q')
77
    _ref(content, '* -n')
78
    return content
79
80
81
@parse_args
82
@error_catcher
83
def _disp(content, args):
84
    """ Display the plugins in the current list."""
85
    range_dict = content.split_plugin_string(args.start, args.stop,
86
                                             subelem_view=True)
87
    formatter = DispDisplay(content.plugin_list)
88
    verbosity = parsers._get_verbosity(args)
89
    level = 'advanced' if args.all else content.disp_level
90
    datasets = True if args.datasets else False
91
    content.display(formatter, current_level=level, verbose=verbosity,
92
                    datasets=datasets, disp_level=args.level, **range_dict)
93
    return content
94
95
96
@parse_args
97
@error_catcher
98
def _list(content, args):
99
    """ List the available plugins. """
100
    list_content = Content()
101
    # Instead of reloading all plugins, copy the list of failed plugins
102
    list_content.failed = content.failed
103
    utils._populate_plugin_list(list_content, pfilter=args.string)
104
    if not len(list_content.plugin_list.plugin_list):
105
        print("No result found.")
106
        return content
107
108
    # Sort the list of dictionaries.
109
    # Sort by the name of the plugin within each dictionary.
110
    list_content.plugin_list.plugin_list = \
111
        sorted(list_content.plugin_list.plugin_list,
112
                key=lambda i: i['name'])
113
114
    formatter = ListDisplay(list_content.plugin_list)
115
    verbosity = parsers._get_verbosity(args)
116
    list_content.display(formatter, verbose=verbosity)
117
    return content
118
119
120
@parse_args
121
@error_catcher
122
def _save(content, args):
123
    """ Save the current process list to file."""
124
    out_file = content.filename if args.input else args.filepath
125
    content.check_file(out_file)
126
    print()
127
    DispDisplay(content.plugin_list)._notices()
128
    content.save(out_file, check=input("Are you sure you want to save the "
129
                 "current data to %s [y/N]" % (out_file)),
130
                 template=args.template)
131
    return content
132
133
134
@parse_args
135
@error_catcher
136
def _mod(content, args):
137
    """ Modify plugin parameters."""
138
    if args.globalpar:
139
        # deal with the global parameter
140
        command = 'global'
141
    else:
142
        pos_str, subelem, dims, command = \
143
            content.separate_plugin_subelem(args.param, False)
144
    if 'expand' in command:
145
        # Run the start stop step view for that dimension alone
146
        _expand(content, f"{pos_str} {dims} {True}")
147
    if 'global' in command:
148
        pos_list = content.get_positions()
149
        modified_list = False
150
        for pos_no in pos_list:
151
            valid_modification = content.modify_global(int(pos_no), args)
152
            if valid_modification:
153
                modified_list = True
154
        if modified_list:
155
            print("Parameter '{}' has been changed globally to '{}'".format(args.param, args.value[0]))
156
        else:
157
            print("Parameter '{}' has not been found in the process list".format(args.param))
158
    else:
159
        if not args.default:
160
            content.check_required_args(args.value, True)
161
        # Get the parameter name for the display later
162
        args.param = content.get_param_arg_str(pos_str, subelem)
0 ignored issues
show
introduced by
The variable subelem does not seem to be defined for all execution paths.
Loading history...
introduced by
The variable pos_str does not seem to be defined for all execution paths.
Loading history...
163
        content_modified = content.modify(pos_str, subelem,
164
                                          ' '.join(args.value),
165
                                          default=args.default,
166
                                          dim=command)
167
        if content_modified:
168
            # Display the selected parameter only
169
            _disp(content, str(args.param))
170
    return content
171
172
@parse_args
173
@error_catcher
174
def _expand(content, args):
175
    """ Expand the plugin preview parameter. """
176
    content.set_preview_display(args.off, args.dim, args.dim_view,
177
                                args.plugin_pos)
178
    if content.expand_dim is not None:
179
        if content.expand_dim == "all" and args.dim is not None:
180
            check_str = f"Are you sure you want to alter the number of "\
181
                        f"dimensions to {args.dim}? [y/N]"
182
            is_modified = content.modify_dimensions(args.plugin_pos, args.dim,
183
                                                    check=input(check_str))
184
            if is_modified:
185
                _disp(content, f"{args.plugin_pos}.preview")
186
        else:
187
            _disp(content, f"{args.plugin_pos}.preview")
188
            #content.set_preview_display(args.off, "all")
189
    return content
190
191
192
@parse_args
193
@error_catcher
194
def _set(content, args):
195
    """ Set the status of the plugin to be on or off. """
196
    for pos in args.plugin_pos:
197
        content.on_and_off(pos, args.status.upper())
198
    _disp(content, '-q')
199
    return content
200
201
202
@parse_args
203
@error_catcher
204
def _add(content, args):
205
    """ Add a plugin to the list. """
206
    elems = content.get_positions()
207
    final = str(int(re.findall(r'\d+', elems[-1])[0])+1) if elems else 1
208
    content.add(args.name, args.pos if args.pos else str(final))
209
    content.check_iterative_loops([int(args.pos)] if args.pos else [int(final)], 1)
210
    _disp(content, '-q')
211
    return content
212
213
214
@parse_args
215
@error_catcher
216
def _iterate(content, args):
217
    """ Set a plugin (or group of plugins) to run iteratively. """
218
    # TODO: note the lack of use of _disp(); maybe will need this for
219
    # visually displaying the iterative loops in the terminal window?
220
    if args.remove is None and args.set is None:
221
        # display only the loops, not the rest of the process list
222
        content.display_iterative_loops()
223
    else:
224
        content.iterate(args)
225
        # display the process list with the visual markers of where iterative
226
        # loops are
227
        _disp(content, '-q')
228
    return content
229
    # TODO: the commented-out code below is associated with the TODO in
230
    # arg_parsers._iterate_arg_parser()
231
#    plugin_indices = args.indices
232
#    iterations = args.iterations
233
#
234
#    start = plugin_indices[0]
235
#    if len(plugin_indices) == 1:
236
#        # only the start index has been given, so there is only one element in
237
#        # the list
238
#        end = plugin_indices[0]
239
#    else:
240
#        # both the start and end index has been given, so there are two elements
241
#        # in the list
242
#        end = plugin_indices[1]
243
#
244
#    content.add_iterate_plugin_group(start, end, iterations[0])
245
246
247
@parse_args
248
@error_catcher
249
def _dupl(content, args):
250
    """ Duplicate a plugin in the list"""
251
    elems = content.get_positions()
252
    final = str(int(re.findall(r'\d+', elems[-1])[0])+1) if elems else 1
253
    content.duplicate(args.orig_pos, args.new_pos if args.new_pos else str(final))
254
    _disp(content, '-q')
255
    return content
256
257
258
@parse_args
259
@error_catcher
260
def _ref(content, args):
261
    """ Refresh a plugin (update it). """
262
    positions = content.get_positions() if args.pos == ['*'] else args.pos
263
    for pos_str in positions:
264
        content.refresh(pos_str, defaults=args.defaults)
265
        if not args.nodisp:
266
            _disp(content, pos_str)
267
    return content
268
269
270
@parse_args
271
@error_catcher
272
def _cite(content, args):
273
    """ Display plugin citations."""
274
    range_dict = content.split_plugin_string(args.start, args.stop)
275
    formatter = CiteDisplay(content.plugin_list)
276
    content.display(formatter, **range_dict)
277
    return content
278
279
280
@parse_args
281
@error_catcher
282
def _rem(content, args):
283
    """ Remove plugin(s) from the list. """
284
    pos_sort = []
285
    for pos in args.pos:
286
        pos_sort.append(int(pos))
287
    pos_sort.sort()
288
    counter=0
289
    for pos in pos_sort:
290
        if ((counter>0 and pos>0)):
291
            pos-=counter
292
        content.remove(content.find_position(str(pos)))
293
        counter+=1
294
        content.check_iterative_loops([pos], -1)
295
    _disp(content, '-q')
296
    return content
297
298
299
@parse_args
300
@error_catcher
301
def _move(content, args):
302
    """ Move a plugin to a different position in the list."""
303
    content.move(args.orig_pos, args.new_pos)
304
    _disp(content, '-q')
305
    return content
306
307
308
@parse_args
309
@error_catcher
310
def _coll(content, arg):
311
    """ List all plugin collections. """
312
    colls = Completer([])._get_collections()
313
    print('\n')
314
    for c in colls:
315
        print('%s\n %s' % ('-'*40, c))
316
    print('-'*40, '\n')
317
    return content
318
319
@parse_args
320
@error_catcher
321
def _clear(content, arg):
322
    """ Clear the current plugin list."""
323
    content.clear(check=input("Are you sure you want to clear the current "
324
                  "plugin list? [y/N]"))
325
    return content
326
327
@parse_args
328
@error_catcher
329
def _exit(content, arg):
330
    """ Close the program."""
331
    content.set_finished(check=input("Are you sure? [y/N]"))
332
    return content
333
334
@parse_args
335
@error_catcher
336
def _level(content, args):
337
    """ Set a visibility level for the parameters."""
338
    content.level(args.level)
339
    return content
340
341
@parse_args
342
@error_catcher
343
def _history(content, arg):
344
    """ View the history of previous commands """
345
    hlen = utils.readline.get_current_history_length()
346
    for i in range(hlen):
347
        print("%5i : %s" % (i, utils.readline.get_history_item(i)))
348
    return content
349
350
@parse_args
351
@error_catcher
352
def _replace(content, args):
353
    """ Replace a plugin with another """
354
    content.replace(args.old, args.new_plugin)
355
    _disp(content, '-q')
356
    return content
357
358
commands = {'open': _open,
359
            'help': _help,
360
            'disp': _disp,
361
            'list': _list,
362
            'save': _save,
363
            'mod': _mod,
364
            'set': _set,
365
            'add': _add,
366
            'dupl': _dupl,
367
            'rem': _rem,
368
            'move': _move,
369
            'ref': _ref,
370
            'level': _level,
371
            'expand': _expand,
372
            'cite': _cite,
373
            'coll': _coll,
374
            'clear': _clear,
375
            'exit': _exit,
376
            'history': _history,
377
            'iterate': _iterate,
378
            'replace': _replace}
379
380
def get_description():
381
    """ For each command, enter the function and save the docstring to a
382
    dictionary
383
    """
384
    command_desc_dict = {command: function_name.__doc__
385
                         for command, function_name in commands.items()}
386
    return command_desc_dict
387
388
def main(test=False):
389
    """
390
    :param test: If test is True the last argument from sys.argv is removed,
391
                 as it contains the directory of the test scripts, which fails the
392
                 parsing of the arguments as it has an unexpected argument.
393
394
                 If test is False then nothing is touched.
395
    """
396
397
    # required for running the tests locally or on travis
398
    # drops the last argument from pytest which is the test file/module
399
    if test:
400
        try:
401
            # find where the /scripts argument is
402
            index_of_scripts_argument = ["scripts" in arg for arg in sys.argv].index(True)
403
            # remove it, including every arguments after it (e.g --cov)
404
            sys.argv = sys.argv[:index_of_scripts_argument]
405
        except ValueError:
406
            # scripts was not part of the arguments passed in by the test
407
            pass
408
409
    args = parsers._config_arg_parser(doc=False)
410
    if args.tree is not None:
411
        file_path = args.tree
412
        hu.get_hdf_tree(file_path, add_shape=True, display=True)
413
        sys.exit(0)
414
    if args.find is not None:
415
        vals = args.find
416
        file_path = vals[0][0]
417
        pattern = vals[0][1]
418
        hu.find_hdf_key(file_path, pattern, display=True)
419
        sys.exit(0)
420
    if args.check is not None:
421
        file_path = args.check
422
        hu.check_tomo_data(file_path)
423
        sys.exit(0)
424
    if args.name is not None:
425
        args.file = args.name
426
427
    if args.error:
428
        utils.error_level = 1
429
430
    print("Running the configurator")
431
    print("Starting Savu Config tool (please wait for prompt)")
432
433
    _reduce_logging_level()
434
435
    content = Content(level="advanced" if args.disp_all else 'basic')
436
437
    with warnings.catch_warnings():
438
        warnings.simplefilter("ignore")
439
        # imports all the (working) plugin modules
440
        content.failed = utils.populate_plugins(error_mode=args.error,
441
                                                examples=args.examples)
442
443
    comp = Completer(commands=commands, plugin_list=pu.plugins)
444
    utils._set_readline(comp.complete)
445
446
447
    # if file flag is passed then open it here
448
    if args.file:
449
        commands['open'](content, args.file)
450
451
    print("\n*** Press Enter for a list of available commands. ***\n")
452
453
    utils.load_history_file(utils.histfile)
454
    accumulative_output = ''
455
    while True:
456
        try:
457
            name = f"{Path(content.filename).stem} " if content.filename else ""
458
            in_text = input(f"{name}>>> ").strip()
459
            in_list = in_text.split(' ', 1)
460
            _write_command_to_log(in_text)
461
462
        except KeyboardInterrupt:
463
            print()
464
            continue
465
        except EOFError:
466
            # makes possible exiting on CTRL + D (EOF, like Python interpreter)
467
            break
468
469
        command, arg = in_list if len(in_list) == 2 else in_list+['']
470
        command = command if command else 'help'
471
        if command not in commands:
472
            print("I'm sorry, that's not a command I recognise. Press Enter "
473
                  "for a list of available commands.")
474
        else:
475
            content = commands[command](content, arg)
476
            if logger.handlers:
477
                # Write the command output to a logger
478
                accumulative_output = _write_output_to_log(accumulative_output)
479
480
        if content.is_finished():
481
            break
482
483
    print("Thanks for using the application")
484
485
486
def _write_command_to_log(in_text):
487
    logger.debug("TEST COMMAND: " + in_text)
488
    logger_rst.debug(f".. dropdown:: >>> {in_text}\n\n" \
489
                     f"    .. code-block:: bash \n")
490
491
492
def _write_output_to_log(accumulative_output):
493
    """ Separate the current command output, format it correctly
494
    and write this to the restructured text log file.
495
496
    :param accumulative_output:
497
    :return: accumulative_output
498
    """
499
    current_output = sys.stdout.getvalue()
500
501
    # Find all previous command output
502
    length_Str = len(accumulative_output)
503
    if length_Str:
504
        # If there is previous output, then select only new lines from the
505
        # current command. This will demonstrate clearly the output of each
506
        # individual command
507
        current_output = current_output[length_Str:]
508
509
    # Characters used for command line colour
510
    unicode_chars = [u'\x1b[100m', u'\x1b[0m', u'\x1b[97m',
511
                     u'\x1b[49m', u'\x1b[46m', u'\x1b[39m',
512
                     u'\x1b[36m']
513
    # Remove unicode characters which cannot be displayed
514
    for code in unicode_chars:
515
        current_output = current_output.replace(code, '')
516
517
    # Indent the text for the rst file format
518
    indent_current_output = pu.indent_multi_line_str(current_output, 2)
519
    # Write to the rst log file
520
    logger_rst.debug(indent_current_output)
521
    return sys.stdout.getvalue()
522
523
524
def _reduce_logging_level():
525
    import logging
526
    logging.getLogger().setLevel(logging.CRITICAL)
527
528
if __name__ == '__main__':
529
    main()
530