Test Failed
Push — master ( 62239c...9ba79c )
by
unknown
01:46 queued 18s
created

scripts.config_generator.savu_config._expand()   A

Complexity

Conditions 5

Size

Total Lines 18
Code Lines 14

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 5
eloc 14
nop 2
dl 0
loc 18
rs 9.2333
c 0
b 0
f 0
1
# Copyright 2014 Diamond Light Source Ltd.
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#     http://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14
15
"""
16
.. module:: savu_config
17
   :platform: Unix
18
   :synopsis: A command line tool for creating Savu plugin lists
19
20
.. moduleauthor:: Nicola Wadeson <[email protected]>
21
22
"""
23
24
import re
25
import sys
26
import logging
27
from . import hdf_utils as hu
28
29
logger = logging.getLogger('documentationLog')
30
logger_rst = logging.getLogger('documentationRst')
31
32
import warnings
33
with warnings.catch_warnings():
34
    warnings.simplefilter("ignore")
35
    from .content import Content
36
    from .completer import Completer
37
    from . import display_formatter
38
    from .display_formatter import ListDisplay, DispDisplay, \
39
        CiteDisplay
40
    from . import arg_parsers as parsers
41
    from savu.plugins import utils as pu
42
    from . import config_utils as utils
43
    from .config_utils import parse_args
44
    from .config_utils import error_catcher
45
46
47
def _help(content, args):
48
    """ Display the help information"""
49
    width = display_formatter.get_terminal_width()
50
    command_title = " Savu configurator commands "
51
    title_separator = "*"*((width-len(command_title))//2)
52
    print(f"{title_separator}{command_title}{title_separator}")
53
    for key in list(commands.keys()):
54
        doc = commands[key].__doc__
55
        if doc:
56
            print("%8s : %s" % (key, commands[key].__doc__))
57
    line_separator = "*" * width
58
    info_text = "* For more information about individual commands type " \
59
                "'<command> -h' *"
60
    title_separator = " "*((width-len(info_text))//2)
61
    print(f"\n{line_separator}")
62
    print(f"{title_separator}{info_text}{title_separator}")
63
    print(line_separator)
64
    return content
65
66
67
@parse_args
68
@error_catcher
69
def _open(content, args):
70
    """ Open an existing process list."""
71
    content.fopen(args.file, update=True, skip=args.skip)
72
    _disp(content, '-q')
73
    _ref(content, '* -n')
74
    return content
75
76
77
@parse_args
78
@error_catcher
79
def _disp(content, args):
80
    """ Display the plugins in the current list."""
81
    range_dict = content.split_plugin_string(args.start, args.stop,
82
                                             subelem_view=True)
83
    formatter = DispDisplay(content.plugin_list)
84
    verbosity = parsers._get_verbosity(args)
85
    level = 'advanced' if args.all else content.disp_level
86
    datasets = True if args.datasets else False
87
    content.display(formatter, current_level=level, verbose=verbosity,
88
                    datasets=datasets, disp_level=args.level, **range_dict)
89
    return content
90
91
92
@parse_args
93
@error_catcher
94
def _list(content, args):
95
    """ List the available plugins. """
96
    list_content = Content()
97
    # Instead of reloading all plugins, copy the list of failed plugins
98
    list_content.failed = content.failed
99
    utils._populate_plugin_list(list_content, pfilter=args.string)
100
    if not len(list_content.plugin_list.plugin_list):
101
        print("No result found.")
102
        return content
103
104
    # Sort the list of dictionaries.
105
    # Sort by the name of the plugin within each dictionary.
106
    list_content.plugin_list.plugin_list = \
107
        sorted(list_content.plugin_list.plugin_list,
108
                key=lambda i: i['name'])
109
110
    formatter = ListDisplay(list_content.plugin_list)
111
    verbosity = parsers._get_verbosity(args)
112
    list_content.display(formatter, verbose=verbosity)
113
    return content
114
115
116
@parse_args
117
@error_catcher
118
def _save(content, args):
119
    """ Save the current process list to file."""
120
    out_file = content.filename if args.input else args.filepath
121
    content.check_file(out_file)
122
    print()
123
    DispDisplay(content.plugin_list)._notices()
124
    content.save(out_file, check=input("Are you sure you want to save the "
125
                 "current data to %s [y/N]" % (out_file)),
126
                 template=args.template)
127
    return content
128
129
130
@parse_args
131
@error_catcher
132
def _mod(content, args):
133
    """ Modify plugin parameters."""
134
    pos_str, subelem, dims, command = \
135
        content.separate_plugin_subelem(args.param, False)
136
    if 'expand' in command:
137
        # Run the start stop step view for that dimension alone
138
        _expand(content, f"{pos_str} {dims} {True}")
139
    else:
140
        if not args.default:
141
            content.check_required_args(args.value, True)
142
        # Get the parameter name for the display later
143
        args.param = content.get_param_arg_str(pos_str, subelem)
144
        content_modified = content.modify(pos_str, subelem,
145
                                          ' '.join(args.value),
146
                                          default=args.default,
147
                                          dim=command)
148
        if content_modified:
149
            # Display the selected parameter only
150
            _disp(content, str(args.param))
151
    return content
152
153
154
@parse_args
155
@error_catcher
156
def _expand(content, args):
157
    """ Expand the plugin preview parameter. """
158
    content.set_preview_display(args.off, args.dim, args.dim_view,
159
                                args.plugin_pos)
160
    if content.expand_dim is not None:
161
        if content.expand_dim == "all" and args.dim is not None:
162
            check_str = f"Are you sure you want to alter the number of "\
163
                        f"dimensions to {args.dim}? [y/N]"
164
            is_modified = content.modify_dimensions(args.plugin_pos, args.dim,
165
                                                    check=input(check_str))
166
            if is_modified:
167
                _disp(content, f"{args.plugin_pos}.preview")
168
        else:
169
            _disp(content, f"{args.plugin_pos}.preview")
170
            #content.set_preview_display(args.off, "all")
171
    return content
172
173
174
@parse_args
175
@error_catcher
176
def _set(content, args):
177
    """ Set the status of the plugin to be on or off. """
178
    for pos in args.plugin_pos:
179
        content.on_and_off(pos, args.status.upper())
180
    _disp(content, '-q')
181
    return content
182
183
184
@parse_args
185
@error_catcher
186
def _add(content, args):
187
    """ Add a plugin to the list. """
188
    elems = content.get_positions()
189
    final = str(int(re.findall(r'\d+', elems[-1])[0])+1) if elems else 1
190
    content.add(args.name, args.pos if args.pos else str(final))
191
    _disp(content, '-q')
192
    return content
193
194
195
@parse_args
196
@error_catcher
197
def _dupl(content, args):
198
    """ Duplicate a plugin in the list"""
199
    elems = content.get_positions()
200
    final = str(int(re.findall(r'\d+', elems[-1])[0])+1) if elems else 1
201
    content.duplicate(args.orig_pos, args.new_pos if args.new_pos else str(final))
202
    _disp(content, '-q')
203
    return content
204
205
206
@parse_args
207
@error_catcher
208
def _ref(content, args):
209
    """ Refresh a plugin (update it). """
210
    positions = content.get_positions() if args.pos == ['*'] else args.pos
211
    for pos_str in positions:
212
        content.refresh(pos_str, defaults=args.defaults)
213
        if not args.nodisp:
214
            _disp(content, pos_str)
215
    return content
216
217
218
@parse_args
219
@error_catcher
220
def _cite(content, args):
221
    """ Display plugin citations."""
222
    range_dict = content.split_plugin_string(args.start, args.stop)
223
    formatter = CiteDisplay(content.plugin_list)
224
    content.display(formatter, **range_dict)
225
    return content
226
227
228
@parse_args
229
@error_catcher
230
def _rem(content, args):
231
    """ Remove plugin(s) from the list. """
232
    pos_sort = []
233
    for pos in args.pos:
234
        pos_sort.append(int(pos))
235
    pos_sort.sort()
236
    counter=0
237
    for pos in pos_sort:
238
        if ((counter>0 and pos>0)):
239
            pos-=counter
240
        content.remove(content.find_position(str(pos)))
241
        counter+=1
242
    _disp(content, '-q')
243
    return content
244
245
246
@parse_args
247
@error_catcher
248
def _move(content, args):
249
    """ Move a plugin to a different position in the list."""
250
    content.move(args.orig_pos, args.new_pos)
251
    _disp(content, '-q')
252
    return content
253
254
255
@parse_args
256
@error_catcher
257
def _coll(content, arg):
258
    """ List all plugin collections. """
259
    colls = Completer([])._get_collections()
260
    print('\n')
261
    for c in colls:
262
        print('%s\n %s' % ('-'*40, c))
263
    print('-'*40, '\n')
264
    return content
265
266
@parse_args
267
@error_catcher
268
def _clear(content, arg):
269
    """ Clear the current plugin list."""
270
    content.clear(check=input("Are you sure you want to clear the current "
271
                  "plugin list? [y/N]"))
272
    return content
273
274
@parse_args
275
@error_catcher
276
def _exit(content, arg):
277
    """ Close the program."""
278
    content.set_finished(check=input("Are you sure? [y/N]"))
279
    return content
280
281
@parse_args
282
@error_catcher
283
def _level(content, args):
284
    """ Set a visibility level for the parameters."""
285
    content.level(args.level)
286
    return content
287
288
@parse_args
289
@error_catcher
290
def _history(content, arg):
291
    """ View the history of previous commands """
292
    hlen = utils.readline.get_current_history_length()
293
    for i in range(hlen):
294
        print("%5i : %s" % (i, utils.readline.get_history_item(i)))
295
    return content
296
297
@parse_args
298
@error_catcher
299
def _replace(content, args):
300
    """ Replace a plugin with another """
301
    content.replace(args.old, args.new_plugin)
302
    _disp(content, '-q')
303
    return content
304
305
commands = {'open': _open,
306
            'help': _help,
307
            'disp': _disp,
308
            'list': _list,
309
            'save': _save,
310
            'mod': _mod,
311
            'set': _set,
312
            'add': _add,
313
            'dupl': _dupl,
314
            'rem': _rem,
315
            'move': _move,
316
            'ref': _ref,
317
            'level': _level,
318
            'expand': _expand,
319
            'cite': _cite,
320
            'coll': _coll,
321
            'clear': _clear,
322
            'exit': _exit,
323
            'history': _history,
324
            'replace': _replace}
325
326
def get_description():
327
    """ For each command, enter the function and save the docstring to a
328
    dictionary
329
    """
330
    command_desc_dict = {command: function_name.__doc__
331
                         for command, function_name in commands.items()}
332
    return command_desc_dict
333
334
def main(test=False):
335
    """
336
    :param test: If test is True the last argument from sys.argv is removed,
337
                 as it contains the directory of the test scripts, which fails the
338
                 parsing of the arguments as it has an unexpected argument.
339
340
                 If test is False then nothing is touched.
341
    """
342
343
    # required for running the tests locally or on travis
344
    # drops the last argument from pytest which is the test file/module
345
    if test:
346
        try:
347
            # find where the /scripts argument is
348
            index_of_scripts_argument = ["scripts" in arg for arg in sys.argv].index(True)
349
            # remove it, including every arguments after it (e.g --cov)
350
            sys.argv = sys.argv[:index_of_scripts_argument]
351
        except ValueError:
352
            # scripts was not part of the arguments passed in by the test
353
            pass
354
355
    args = parsers._config_arg_parser(doc=False)
356
    if args.tree is not None:
357
        file_path = args.tree
358
        hu.get_hdf_tree(file_path, add_shape=True, display=True)
359
        sys.exit(0)
360
    if args.find is not None:
361
        vals = args.find
362
        file_path = vals[0][0]
363
        pattern = vals[0][1]
364
        hu.find_hdf_key(file_path, pattern, display=True)
365
        sys.exit(0)
366
    if args.check is not None:
367
        file_path = args.check
368
        hu.check_tomo_data(file_path)
369
        sys.exit(0)
370
371
    if args.error:
372
        utils.error_level = 1
373
374
    print("Running the configurator")
375
    print("Starting Savu Config tool (please wait for prompt)")
376
377
    _reduce_logging_level()
378
379
    content = Content(level="advanced" if args.disp_all else 'basic')
380
381
    with warnings.catch_warnings():
382
        warnings.simplefilter("ignore")
383
        # imports all the (working) plugin modules
384
        content.failed = utils.populate_plugins(error_mode=args.error,
385
                                                examples=args.examples)
386
387
    comp = Completer(commands=commands, plugin_list=pu.plugins)
388
    utils._set_readline(comp.complete)
389
390
391
    # if file flag is passed then open it here
392
    if args.file:
393
        commands['open'](content, args.file)
394
395
    print("\n*** Press Enter for a list of available commands. ***\n")
396
397
    utils.load_history_file(utils.histfile)
398
    accumulative_output = ''
399
    while True:
400
        try:
401
            in_text = input(">>> ").strip()
402
            in_list = in_text.split(' ', 1)
403
            _write_command_to_log(in_text)
404
405
        except KeyboardInterrupt:
406
            print()
407
            continue
408
        except EOFError:
409
            # makes possible exiting on CTRL + D (EOF, like Python interpreter)
410
            break
411
412
        command, arg = in_list if len(in_list) == 2 else in_list+['']
413
        command = command if command else 'help'
414
        if command not in commands:
415
            print("I'm sorry, that's not a command I recognise. Press Enter "
416
                  "for a list of available commands.")
417
        else:
418
            content = commands[command](content, arg)
419
            if logger.handlers:
420
                # Write the command output to a logger
421
                accumulative_output = _write_output_to_log(accumulative_output)
422
423
        if content.is_finished():
424
            break
425
426
    print("Thanks for using the application")
427
428
429
def _write_command_to_log(in_text):
430
    logger.debug("TEST COMMAND: " + in_text)
431
    logger_rst.debug(f".. dropdown:: >>> {in_text}\n\n" \
432
                     f"    .. code-block:: bash \n")
433
434
435
def _write_output_to_log(accumulative_output):
436
    """ Separate the current command output, format it correctly
437
    and write this to the restructured text log file.
438
439
    :param accumulative_output:
440
    :return: accumulative_output
441
    """
442
    current_output = sys.stdout.getvalue()
443
444
    # Find all previous command output
445
    length_Str = len(accumulative_output)
446
    if length_Str:
447
        # If there is previous output, then select only new lines from the
448
        # current command. This will demonstrate clearly the output of each
449
        # individual command
450
        current_output = current_output[length_Str:]
451
452
    # Characters used for command line colour
453
    unicode_chars = [u'\x1b[100m', u'\x1b[0m', u'\x1b[97m',
454
                     u'\x1b[49m', u'\x1b[46m', u'\x1b[39m',
455
                     u'\x1b[36m']
456
    # Remove unicode characters which cannot be displayed
457
    for code in unicode_chars:
458
        current_output = current_output.replace(code, '')
459
460
    # Indent the text for the rst file format
461
    indent_current_output = pu.indent_multi_line_str(current_output, 2)
462
    # Write to the rst log file
463
    logger_rst.debug(indent_current_output)
464
    return sys.stdout.getvalue()
465
466
467
def _reduce_logging_level():
468
    import logging
469
    logging.getLogger().setLevel(logging.CRITICAL)
470
471
if __name__ == '__main__':
472
    main()
473