Test Failed
Pull Request — master (#708)
by Daniil
03:33
created

scripts.config_generator.savu_config._set()   A

Complexity

Conditions 2

Size

Total Lines 8
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 7
nop 2
dl 0
loc 8
rs 10
c 0
b 0
f 0
1
# Copyright 2014 Diamond Light Source Ltd.
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#     http://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14
15
"""
16
.. module:: savu_config.py
17
   :platform: Unix
18
   :synopsis: A command line tool for creating Savu plugin lists
19
20
.. moduleauthor:: Nicola Wadeson <[email protected]>
21
22
"""
23
24
25
26
import re
27
import sys
28
29
30
import warnings
31
with warnings.catch_warnings():
32
    warnings.simplefilter("ignore")
33
    from .content import Content
34
    from .completer import Completer
35
    from .display_formatter import ListDisplay, DispDisplay
36
    from . import arg_parsers as parsers
37
    from savu.plugins import utils as pu
38
    from . import config_utils as utils
39
    from .config_utils import parse_args
40
    from .config_utils import error_catcher
41
42
43
def _help(content, args):
44
    """ Display the help information"""
45
    print('%s Savu configurator commands %s\n' % tuple(['*'*21]*2))
46
    for key in list(commands.keys()):
47
        doc = commands[key].__doc__
48
        if doc:
49
            print("%8s : %s" % (key, commands[key].__doc__))
50
    print("\n%s\n* For more information about individual commands type "
51
          "'<command> -h' *\n%s\n" % tuple(['*'*70]*2))
52
    return content
53
54
55
@parse_args
56
@error_catcher
57
def _open(content, args):
58
    """ Open an existing process list."""
59
    content.fopen(args.file, update=True, skip=args.skip)
60
    _ref(content, '* -n')
61
    _disp(content, '-q')
62
    return content
63
64
65
@parse_args
66
@error_catcher
67
def _disp(content, args):
68
    """ Display the plugins in the current list."""
69
    range_dict = utils.__get_start_stop(content, args.start, args.stop)
70
    formatter = DispDisplay(content.plugin_list)
71
    verbosity = parsers._get_verbosity(args)
72
    level = 'all' if args.all else content.disp_level
73
    content.display(formatter, level=level, verbose=verbosity, **range_dict)
74
    return content
75
76
77
@parse_args
78
@error_catcher
79
def _list(content, args):
80
    """ List the available plugins. """
81
    list_content = Content()
82
    utils._populate_plugin_list(list_content, pfilter=args.string)
83
    if not len(list_content.plugin_list.plugin_list):
84
        print("No result found.")
85
        return content
86
    formatter = ListDisplay(list_content.plugin_list)
87
    verbosity = parsers._get_verbosity(args)
88
    list_content.display(formatter, verbose=verbosity)
89
    return content
90
91
92
@parse_args
93
@error_catcher
94
def _save(content, args):
95
    """ Save the current process list to file."""
96
    out_file = content.filename if args.input else args.filepath
97
    content.check_file(out_file)
98
    print()
99
    DispDisplay(content.plugin_list)._notices()
100
    content.save(out_file, check=input("Are you sure you want to save the "
101
                 "current data to %s' [y/N]" % (out_file)),
102
                 template=args.template)
103
    return content
104
105
106
@parse_args
107
@error_catcher
108
def _mod(content, args):
109
    """ Modify plugin parameters. """
110
    pos_str, subelem = args.param.split('.')
111
    content.modify(pos_str, subelem, ' '.join(args.value))
112
    _disp(content, pos_str)
113
    return content
114
115
116
@parse_args
117
@error_catcher
118
def _set(content, args):
119
    """ Set the status of the plugin to be on or off. """
120
    for pos in args.plugin_pos:
121
        content.on_and_off(pos, args.status.upper())
122
    _disp(content, '-q')
123
    return content
124
125
126
@parse_args
127
@error_catcher
128
def _add(content, args):
129
    """ Add a plugin to the list. """
130
    elems = content.get_positions()
131
    final = str(int(re.findall(r'\d+', elems[-1])[0])+1) if elems else 1
132
    content.add(args.name, args.pos if args.pos else str(final))
133
    _disp(content, '-q')
134
    return content
135
136
137
@parse_args
138
@error_catcher
139
def _ref(content, args):
140
    """ Refresh a plugin (update it). """
141
    positions = content.get_positions() if args.pos == ['*'] else args.pos
142
    for pos_str in positions:
143
        content.refresh(pos_str, defaults=args.defaults)
144
        if not args.nodisp:
145
            _disp(content, pos_str)
146
    return content
147
148
149
@parse_args
150
@error_catcher
151
def _rem(content, args):
152
    """ Remove plugin(s) from the list. """
153
    content.remove(content.find_position(args.pos))
154
    _disp(content, '-q')
155
    return content
156
157
158
@parse_args
159
@error_catcher
160
def _move(content, args):
161
    """ Move a plugin to a different position in the list."""
162
    content.move(args.orig_pos, args.new_pos)
163
    _disp(content, '-q')
164
    return content
165
166
167
@parse_args
168
@error_catcher
169
def _coll(content, arg):
170
    """ List all plugin collections. """
171
    colls = Completer([])._get_collections()
172
    print('\n')
173
    for c in colls:
174
        print('%s\n %s' % ('-'*40, c))
175
    print('-'*40, '\n')
176
    return content
177
178
179
def _clear(content, arg):
180
    """ Clear the current plugin list."""
181
    content.clear(check=input("Are you sure you want to clear the current "
182
                  "plugin list? [y/N]"))
183
    return content
184
185
186
def _exit(content, arg):
187
    """ Close the program."""
188
    content.set_finished(check=input("Are you sure? [y/N]"))
189
    return content
190
191
192
def _history(content, arg):
193
    hlen = utils.readline.get_current_history_length()
194
    for i in range(hlen):
195
        print("%5i : %s" % (i, utils.readline.get_history_item(i)))
196
    return content
197
198
199
commands = {'open': _open,
200
            'help': _help,
201
            'disp': _disp,
202
            'list': _list,
203
            'save': _save,
204
            'mod': _mod,
205
            'set': _set,
206
            'add': _add,
207
            'rem': _rem,
208
            'move': _move,
209
            'ref': _ref,
210
            'coll': _coll,
211
            'clear': _clear,
212
            'exit': _exit,
213
            'history': _history}
214
215
216
def main(test=False):
217
    """
218
    :param test: If test is True the last argument from sys.argv is removed,
219
                 as it contains the directory of the test scripts, which fails the
220
                 parsing of the arguments as it has an unexpected argument.
221
222
                 If test is False then nothing is touched.
223
    """
224
225
    print("Running the configurator")
226
    # required for running the tests locally or on travis
227
    # drops the last argument from pytest which is the test file/module
228
    if test:
229
        try:
230
            # find where the /scripts argument is
231
            index_of_scripts_argument = ["scripts" in arg for arg in sys.argv].index(True)
232
            # remove it, including every arguments after it (e.g --cov)
233
            sys.argv = sys.argv[:index_of_scripts_argument]
234
        except ValueError:
235
            # scripts was not part of the arguments passed in by the test
236
            pass
237
238
    args = parsers._config_arg_parser()
239
    if args.error:
240
        utils.error_level = 1
241
242
    print("Starting Savu Config tool (please wait for prompt)")
243
244
    _reduce_logging_level()
245
246
    content = Content(level="all" if args.disp_all else 'user')
247
248
    with warnings.catch_warnings():
249
        warnings.simplefilter("ignore")
250
        # imports all the (working) plugin modules
251
        content.failed = utils.populate_plugins(error_mode=args.error,
252
                                                examples=args.examples)
253
254
    comp = Completer(commands=commands, plugin_list=pu.plugins)
255
    utils._set_readline(comp.complete)
256
257
258
    # if file flag is passed then open it here
259
    if args.file:
260
        commands['open'](content, args.file)
261
262
    print("\n*** Press Enter for a list of available commands. ***\n")
263
264
    utils.load_history_file(utils.histfile)
265
266
    while True:
267
        try:
268
            in_list = input(">>> ").strip().split(' ', 1)
269
        except KeyboardInterrupt:
270
            print()
271
            continue
272
        except EOFError:
273
            # makes possible exiting on CTRL + D (EOF, like Python interpreter)
274
            break
275
276
        command, arg = in_list if len(in_list) == 2 else in_list+['']
277
        command = command if command else 'help'
278
        if command not in commands:
279
            print("I'm sorry, that's not a command I recognise. Press Enter "
280
                  "for a list of available commands.")
281
        else:
282
            content = commands[command](content, arg)
283
284
        if content.is_finished():
285
            break
286
287
    print("Thanks for using the application")
288
289
def _reduce_logging_level():
290
    import logging
291
    logging.getLogger().setLevel(logging.CRITICAL)
292
293
if __name__ == '__main__':
294
    main()
295