| Total Complexity | 56 | 
| Total Lines | 247 | 
| Duplicated Lines | 13.36 % | 
| Changes | 0 | ||
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like scripts.config_generator.config_utils often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
| 1 | # Copyright 2014 Diamond Light Source Ltd.  | 
            ||
| 2 | #  | 
            ||
| 3 | # Licensed under the Apache License, Version 2.0 (the "License");  | 
            ||
| 4 | # you may not use this file except in compliance with the License.  | 
            ||
| 5 | # You may obtain a copy of the License at  | 
            ||
| 6 | #  | 
            ||
| 7 | # http://www.apache.org/licenses/LICENSE-2.0  | 
            ||
| 8 | #  | 
            ||
| 9 | # Unless required by applicable law or agreed to in writing, software  | 
            ||
| 10 | # distributed under the License is distributed on an "AS IS" BASIS,  | 
            ||
| 11 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  | 
            ||
| 12 | # See the License for the specific language governing permissions and  | 
            ||
| 13 | # limitations under the License.  | 
            ||
| 14 | |||
| 15 | """  | 
            ||
| 16 | .. module:: config_utils  | 
            ||
| 17 | :platform: Unix  | 
            ||
| 18 | :synopsis: Helper functions for the configurator commands.  | 
            ||
| 19 | |||
| 20 | .. moduleauthor:: Nicola Wadeson <[email protected]>  | 
            ||
| 21 | |||
| 22 | """  | 
            ||
| 23 | import re  | 
            ||
| 24 | import sys  | 
            ||
| 25 | import os  | 
            ||
| 26 | import mmap  | 
            ||
| 27 | import atexit  | 
            ||
| 28 | import logging  | 
            ||
| 29 | import traceback  | 
            ||
| 30 | import pkgutil  | 
            ||
| 31 | |||
| 32 | import importlib.util  | 
            ||
| 33 | from functools import wraps  | 
            ||
| 34 | import savu.plugins.utils as pu  | 
            ||
| 35 | import savu.data.data_structures.utils as du  | 
            ||
| 36 | |||
| 37 | if os.name == "nt":  | 
            ||
| 38 | from . import win_readline as readline  | 
            ||
| 39 | else:  | 
            ||
| 40 | import readline  | 
            ||
| 41 | |||
| 42 | histfile = os.path.join(os.path.expanduser("~"), ".savuhist") | 
            ||
| 43 | histlen = 1000  | 
            ||
| 44 | logging.basicConfig(level="CRITICAL")  | 
            ||
| 45 | error_level = 0  | 
            ||
| 46 | |||
| 47 | |||
| 48 | class DummyFile(object):  | 
            ||
| 49 | def write(self, x):  | 
            ||
| 50 | pass  | 
            ||
| 51 | |||
| 52 | |||
| 53 | def _redirect_stdout():  | 
            ||
| 54 | save_stdout = sys.stdout  | 
            ||
| 55 | sys.stdout = DummyFile()  | 
            ||
| 56 | return save_stdout  | 
            ||
| 57 | |||
| 58 | |||
| 59 | def load_history_file(hfile):  | 
            ||
| 60 | try:  | 
            ||
| 61 | readline.read_history_file(hfile)  | 
            ||
| 62 | readline.set_history_length(histlen)  | 
            ||
| 63 | except IOError:  | 
            ||
| 64 | pass  | 
            ||
| 65 | atexit.register(write_history_to_file)  | 
            ||
| 66 | |||
| 67 | |||
| 68 | def write_history_to_file():  | 
            ||
| 69 | try:  | 
            ||
| 70 | readline.write_history_file(histfile)  | 
            ||
| 71 | except IOError:  | 
            ||
| 72 | pass  | 
            ||
| 73 | |||
| 74 | |||
| 75 | def _set_readline(completer):  | 
            ||
| 76 | # we want to treat '/' as part of a word, so override the delimiters  | 
            ||
| 77 |     readline.set_completer_delims(" \t\n;") | 
            ||
| 78 |     readline.parse_and_bind("tab: complete") | 
            ||
| 79 | readline.set_completer(completer)  | 
            ||
| 80 | |||
| 81 | |||
| 82 | def parse_args(function):  | 
            ||
| 83 | @wraps(function)  | 
            ||
| 84 | def _parse_args_wrap_function(content, args):  | 
            ||
| 85 | parser = "%s_arg_parser" % function.__name__  | 
            ||
| 86 | from . import arg_parsers as parsers  | 
            ||
| 87 | |||
| 88 | args = getattr(parsers, parser)(args.split(), doc=False)  | 
            ||
| 89 | if not args:  | 
            ||
| 90 | return content  | 
            ||
| 91 | return function(content, args)  | 
            ||
| 92 | |||
| 93 | return _parse_args_wrap_function  | 
            ||
| 94 | |||
| 95 | |||
| 96 | def error_catcher(function):  | 
            ||
| 97 | View Code Duplication | @wraps(function)  | 
            |
| 
                                                                                                    
                        
                         | 
                |||
| 98 | def error_catcher_wrap_function(content, args):  | 
            ||
| 99 | try:  | 
            ||
| 100 | return function(content, args)  | 
            ||
| 101 | except Exception as e:  | 
            ||
| 102 | err_msg_list = str(e).split()  | 
            ||
| 103 | savu_error = True if len(err_msg_list) > 1 and err_msg_list[1] == 'ERROR:' else False  | 
            ||
| 104 | |||
| 105 | if error_level == 0 and savu_error:  | 
            ||
| 106 | print(e)  | 
            ||
| 107 | elif error_level == 0:  | 
            ||
| 108 |                 print(f"{type(e).__name__}: {e}") | 
            ||
| 109 | elif error_level == 1:  | 
            ||
| 110 | traceback.print_exc(file=sys.stdout)  | 
            ||
| 111 | |||
| 112 | return content  | 
            ||
| 113 | |||
| 114 | return error_catcher_wrap_function  | 
            ||
| 115 | |||
| 116 | |||
| 117 | View Code Duplication | def error_catcher_savu(function):  | 
            |
| 118 | @wraps(function)  | 
            ||
| 119 | def error_catcher_wrap_function():  | 
            ||
| 120 | try:  | 
            ||
| 121 | return function()  | 
            ||
| 122 | except Exception as e:  | 
            ||
| 123 | err_msg_list = str(e).split()  | 
            ||
| 124 | savu_error = True if len(err_msg_list) > 1 and err_msg_list[1] == 'ERROR:' else False  | 
            ||
| 125 | |||
| 126 | if error_level == 0 and savu_error:  | 
            ||
| 127 | print(e)  | 
            ||
| 128 | elif error_level == 0:  | 
            ||
| 129 |                 print(f"{type(e).__name__}: {e}") | 
            ||
| 130 | elif error_level == 1:  | 
            ||
| 131 | traceback.print_exc(file=sys.stdout)  | 
            ||
| 132 | |||
| 133 | return error_catcher_wrap_function  | 
            ||
| 134 | |||
| 135 | |||
| 136 | def populate_plugins(error_mode=False, examples=False):  | 
            ||
| 137 | # load all the plugins  | 
            ||
| 138 | plugins_paths = pu.get_plugins_paths(examples=examples)  | 
            ||
| 139 |     failed_imports = {} | 
            ||
| 140 | |||
| 141 | for path, name in plugins_paths.items():  | 
            ||
| 142 | for finder, module_name, is_pkg in pkgutil.walk_packages([path], name):  | 
            ||
| 143 | if not is_pkg:  | 
            ||
| 144 | failed_imports = _load_module(finder, module_name, failed_imports, error_mode)  | 
            ||
| 145 | return failed_imports  | 
            ||
| 146 | |||
| 147 | |||
| 148 | def _load_module(finder, module_name, failed_imports, error_mode):  | 
            ||
| 149 | try:  | 
            ||
| 150 | # need to ignore loading of plugin.utils as it is emptying the list  | 
            ||
| 151 | spec = finder.find_spec(module_name)  | 
            ||
| 152 | mod = importlib.util.module_from_spec(spec)  | 
            ||
| 153 | sys.modules[spec.name] = mod  | 
            ||
| 154 | spec.loader.exec_module(mod)  | 
            ||
| 155 | # Load the plugin class and ensure the tools file is present  | 
            ||
| 156 | plugin = pu.load_class(module_name)()  | 
            ||
| 157 | if not plugin.get_plugin_tools():  | 
            ||
| 158 | raise OSError(f"Tools file not found.")  | 
            ||
| 159 | except Exception as e:  | 
            ||
| 160 | if _is_registered_plugin(mod):  | 
            ||
| 161 | clazz = pu._get_cls_name(module_name)  | 
            ||
| 162 | failed_imports[clazz] = e  | 
            ||
| 163 | if error_mode:  | 
            ||
| 164 |                 print(("\nUnable to load plugin %s\n%s" % (module_name, e))) | 
            ||
| 165 | return failed_imports  | 
            ||
| 166 | |||
| 167 | |||
| 168 | def _is_registered_plugin(mod):  | 
            ||
| 169 | with open(mod.__file__) as f:  | 
            ||
| 170 | for line in f:  | 
            ||
| 171 |             if "@register_plugin" in line and line.replace(" ", "")[0] != "#": | 
            ||
| 172 | return True  | 
            ||
| 173 | return False  | 
            ||
| 174 | |||
| 175 | |||
| 176 | def _dawn_setup():  | 
            ||
| 177 | for plugin in list(pu.dawn_plugins.keys()):  | 
            ||
| 178 | p = pu.plugins[plugin]()  | 
            ||
| 179 | pu.dawn_plugins[plugin]['input rank'] = \  | 
            ||
| 180 | du.get_pattern_rank(p.get_plugin_pattern())  | 
            ||
| 181 |         pu.dawn_plugins[plugin]['description'] = p.__doc__.split(':param')[0] | 
            ||
| 182 | params = _get_dawn_parameters(p)  | 
            ||
| 183 | pu.dawn_plugin_params[plugin] = params  | 
            ||
| 184 | |||
| 185 | |||
| 186 | def _get_dawn_parameters(plugin):  | 
            ||
| 187 | plugin.get_plugin_tools()._populate_default_parameters()  | 
            ||
| 188 | desc = plugin.p_dict["description"]  | 
            ||
| 189 |     params = {} | 
            ||
| 190 | for key, value in plugin.parameters.items():  | 
            ||
| 191 | if key not in ["in_datasets", "out_datasets"]:  | 
            ||
| 192 |             params[key] = {"value": value, "hint": desc[key]} | 
            ||
| 193 | return params  | 
            ||
| 194 | |||
| 195 | |||
| 196 | def _populate_plugin_list(content, pfilter=""):  | 
            ||
| 197 | """ Populate the plugin list from a list of plugin instances. """  | 
            ||
| 198 | content.plugin_list.plugin_list = []  | 
            ||
| 199 | sorted_plugins = __get_filtered_plugins(pfilter)  | 
            ||
| 200 | # Remove plugins which failed to load correctly  | 
            ||
| 201 | loaded_plugins = [p for p in sorted_plugins  | 
            ||
| 202 | if not content.plugin_in_failed_dict(p)]  | 
            ||
| 203 | count = 0  | 
            ||
| 204 | for key in loaded_plugins:  | 
            ||
| 205 | content.add(key, str(count))  | 
            ||
| 206 | count += 1  | 
            ||
| 207 | |||
| 208 | |||
| 209 | def _search_plugin_file(module_name, pfilter):  | 
            ||
| 210 | """Check for string inside file"""  | 
            ||
| 211 | string_found = False  | 
            ||
| 212 | |||
| 213 | savu_base_path = \  | 
            ||
| 214 |         os.path.dirname(os.path.realpath(__file__)).split('scripts')[0] | 
            ||
| 215 |     file_dir = module_name.replace('.', '/') | 
            ||
| 216 | file_path = savu_base_path + file_dir + '.py'  | 
            ||
| 217 | if os.path.isfile(file_path):  | 
            ||
| 218 | plugin_file = open(file_path, "r")  | 
            ||
| 219 | data = plugin_file.read().split()  | 
            ||
| 220 | if pfilter in data:  | 
            ||
| 221 | string_found = True  | 
            ||
| 222 | plugin_file.close()  | 
            ||
| 223 | return string_found  | 
            ||
| 224 | |||
| 225 | |||
| 226 | def __get_filtered_plugins(pfilter):  | 
            ||
| 227 | """ Get a sorted, filter list of plugins. """  | 
            ||
| 228 | key_list = []  | 
            ||
| 229 | star_search = \  | 
            ||
| 230 |         pfilter.split("*")[0] if pfilter and "*" in pfilter else False | 
            ||
| 231 | |||
| 232 | for key, value in pu.plugins.items():  | 
            ||
| 233 | if star_search:  | 
            ||
| 234 | search = '(?i)^' + star_search  | 
            ||
| 235 | if re.match(search, value.__name__) or \  | 
            ||
| 236 | re.match(search, value.__module__):  | 
            ||
| 237 | key_list.append(key)  | 
            ||
| 238 | elif pfilter in value.__module__ or pfilter in value.__name__:  | 
            ||
| 239 | key_list.append(key)  | 
            ||
| 240 | else:  | 
            ||
| 241 | # Check if the word is present in the file  | 
            ||
| 242 | if _search_plugin_file(value.__module__, pfilter):  | 
            ||
| 243 | key_list.append(key)  | 
            ||
| 244 | |||
| 245 | key_list.sort()  | 
            ||
| 246 | return key_list  | 
            ||
| 247 |