Test Failed
Push — master ( 7afc9e...4ab57c )
by
unknown
01:32 queued 18s
created

scripts.config_generator.config_utils   C

Complexity

Total Complexity 56

Size/Duplication

Total Lines 247
Duplicated Lines 13.36 %

Importance

Changes 0
Metric Value
eloc 169
dl 33
loc 247
rs 5.5199
c 0
b 0
f 0
wmc 56

15 Functions

Rating   Name   Duplication   Size   Complexity  
A _redirect_stdout() 0 4 1
A load_history_file() 0 7 2
A _set_readline() 0 5 1
A write_history_to_file() 0 5 2
A parse_args() 0 12 2
C __get_filtered_plugins() 0 21 9
A _populate_plugin_list() 0 11 2
B error_catcher_savu() 17 17 7
A _search_plugin_file() 0 15 3
A _load_module() 0 18 5
A _is_registered_plugin() 0 6 5
A _dawn_setup() 0 8 2
A populate_plugins() 0 10 4
B error_catcher() 16 19 7
A _get_dawn_parameters() 0 8 3

1 Method

Rating   Name   Duplication   Size   Complexity  
A DummyFile.write() 0 2 1

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like scripts.config_generator.config_utils often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# Copyright 2014 Diamond Light Source Ltd.
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#     http://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14
15
"""
16
.. module:: config_utils
17
   :platform: Unix
18
   :synopsis: Helper functions for the configurator commands.
19
20
.. moduleauthor:: Nicola Wadeson <[email protected]>
21
22
"""
23
import re
24
import sys
25
import os
26
import mmap
27
import atexit
28
import logging
29
import traceback
30
import pkgutil
31
32
import importlib.util
33
from functools import wraps
34
import savu.plugins.utils as pu
35
import savu.data.data_structures.utils as du
36
37
if os.name == "nt":
38
    from . import win_readline as readline
39
else:
40
    import readline
41
42
histfile = os.path.join(os.path.expanduser("~"), ".savuhist")
43
histlen = 1000
44
logging.basicConfig(level="CRITICAL")
45
error_level = 0
46
47
48
class DummyFile(object):
49
    def write(self, x):
50
        pass
51
52
53
def _redirect_stdout():
54
    save_stdout = sys.stdout
55
    sys.stdout = DummyFile()
56
    return save_stdout
57
58
59
def load_history_file(hfile):
60
    try:
61
        readline.read_history_file(hfile)
62
        readline.set_history_length(histlen)
63
    except IOError:
64
        pass
65
    atexit.register(write_history_to_file)
66
67
68
def write_history_to_file():
69
    try:
70
        readline.write_history_file(histfile)
71
    except IOError:
72
        pass
73
74
75
def _set_readline(completer):
76
    # we want to treat '/' as part of a word, so override the delimiters
77
    readline.set_completer_delims(" \t\n;")
78
    readline.parse_and_bind("tab: complete")
79
    readline.set_completer(completer)
80
81
82
def parse_args(function):
83
    @wraps(function)
84
    def _parse_args_wrap_function(content, args):
85
        parser = "%s_arg_parser" % function.__name__
86
        from . import arg_parsers as parsers
87
88
        args = getattr(parsers, parser)(args.split(), doc=False)
89
        if not args:
90
            return content
91
        return function(content, args)
92
93
    return _parse_args_wrap_function
94
95
96
def error_catcher(function):
97 View Code Duplication
    @wraps(function)
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
98
    def error_catcher_wrap_function(content, args):
99
        try:
100
            return function(content, args)
101
        except Exception as e:
102
            err_msg_list = str(e).split()
103
            savu_error = True if len(err_msg_list) > 1 and err_msg_list[1] == 'ERROR:' else False
104
105
            if error_level == 0 and savu_error:
106
                print(e)
107
            elif error_level == 0:
108
                print(f"{type(e).__name__}: {e}")
109
            elif error_level == 1:
110
                traceback.print_exc(file=sys.stdout)
111
112
            return content
113
114
    return error_catcher_wrap_function
115
116
117 View Code Duplication
def error_catcher_savu(function):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
118
    @wraps(function)
119
    def error_catcher_wrap_function():
120
        try:
121
            return function()
122
        except Exception as e:
123
            err_msg_list = str(e).split()
124
            savu_error = True if len(err_msg_list) > 1 and err_msg_list[1] == 'ERROR:' else False
125
126
            if error_level == 0 and savu_error:
127
                print(e)
128
            elif error_level == 0:
129
                print(f"{type(e).__name__}: {e}")
130
            elif error_level == 1:
131
                traceback.print_exc(file=sys.stdout)
132
133
    return error_catcher_wrap_function
134
135
136
def populate_plugins(error_mode=False, examples=False):
137
    # load all the plugins
138
    plugins_paths = pu.get_plugins_paths(examples=examples)
139
    failed_imports = {}
140
141
    for path, name in plugins_paths.items():
142
        for finder, module_name, is_pkg in pkgutil.walk_packages([path], name):
143
            if not is_pkg:
144
                failed_imports = _load_module(finder, module_name, failed_imports, error_mode)
145
    return failed_imports
146
147
148
def _load_module(finder, module_name, failed_imports, error_mode):
149
    try:
150
        # need to ignore loading of plugin.utils as it is emptying the list
151
        spec = finder.find_spec(module_name)
152
        mod = importlib.util.module_from_spec(spec)
153
        sys.modules[spec.name] = mod
154
        spec.loader.exec_module(mod)
155
        # Load the plugin class and ensure the tools file is present
156
        plugin = pu.load_class(module_name)()
157
        if not plugin.get_plugin_tools():
158
            raise OSError(f"Tools file not found.")
159
    except Exception as e:
160
        if _is_registered_plugin(mod):
0 ignored issues
show
introduced by
The variable mod does not seem to be defined for all execution paths.
Loading history...
161
            clazz = pu._get_cls_name(module_name)
162
            failed_imports[clazz] = e
163
            if error_mode:
164
                print(("\nUnable to load plugin %s\n%s" % (module_name, e)))
165
    return failed_imports
166
167
168
def _is_registered_plugin(mod):
169
    with open(mod.__file__) as f:
170
        for line in f:
171
            if "@register_plugin" in line and line.replace(" ", "")[0] != "#":
172
                return True
173
    return False
174
175
176
def _dawn_setup():
177
    for plugin in list(pu.dawn_plugins.keys()):
178
        p = pu.plugins[plugin]()
179
        pu.dawn_plugins[plugin]['input rank'] = \
180
            du.get_pattern_rank(p.get_plugin_pattern())
181
        pu.dawn_plugins[plugin]['description'] = p.__doc__.split(':param')[0]
182
        params = _get_dawn_parameters(p)
183
        pu.dawn_plugin_params[plugin] = params
184
185
186
def _get_dawn_parameters(plugin):
187
    plugin.get_plugin_tools()._populate_default_parameters()
188
    desc = plugin.p_dict["description"]
189
    params = {}
190
    for key, value in plugin.parameters.items():
191
        if key not in ["in_datasets", "out_datasets"]:
192
            params[key] = {"value": value, "hint": desc[key]}
193
    return params
194
195
196
def _populate_plugin_list(content, pfilter=""):
197
    """ Populate the plugin list from a list of plugin instances. """
198
    content.plugin_list.plugin_list = []
199
    sorted_plugins = __get_filtered_plugins(pfilter)
200
    # Remove plugins which failed to load correctly
201
    loaded_plugins = [p for p in sorted_plugins
202
                      if not content.plugin_in_failed_dict(p)]
203
    count = 0
204
    for key in loaded_plugins:
205
        content.add(key, str(count))
206
        count += 1
207
208
209
def _search_plugin_file(module_name, pfilter):
210
    """Check for string inside file"""
211
    string_found = False
212
213
    savu_base_path = \
214
        os.path.dirname(os.path.realpath(__file__)).split('scripts')[0]
215
    file_dir = module_name.replace('.', '/')
216
    file_path = savu_base_path + file_dir + '.py'
217
    if os.path.isfile(file_path):
218
        plugin_file = open(file_path, "r")
219
        data = plugin_file.read().split()
220
        if pfilter in data:
221
            string_found = True
222
        plugin_file.close()
223
    return string_found
224
225
226
def __get_filtered_plugins(pfilter):
227
    """ Get a sorted, filter list of plugins. """
228
    key_list = []
229
    star_search = \
230
        pfilter.split("*")[0] if pfilter and "*" in pfilter else False
231
232
    for key, value in pu.plugins.items():
233
        if star_search:
234
            search = '(?i)^' + star_search
235
            if re.match(search, value.__name__) or \
236
                    re.match(search, value.__module__):
237
                key_list.append(key)
238
        elif pfilter in value.__module__ or pfilter in value.__name__:
239
            key_list.append(key)
240
        else:
241
            # Check if the word is present in the file
242
            if _search_plugin_file(value.__module__, pfilter):
243
                key_list.append(key)
244
245
    key_list.sort()
246
    return key_list
247