Total Complexity | 56 |
Total Lines | 247 |
Duplicated Lines | 13.36 % |
Changes | 0 |
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like scripts.config_generator.config_utils often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | # Copyright 2014 Diamond Light Source Ltd. |
||
2 | # |
||
3 | # Licensed under the Apache License, Version 2.0 (the "License"); |
||
4 | # you may not use this file except in compliance with the License. |
||
5 | # You may obtain a copy of the License at |
||
6 | # |
||
7 | # http://www.apache.org/licenses/LICENSE-2.0 |
||
8 | # |
||
9 | # Unless required by applicable law or agreed to in writing, software |
||
10 | # distributed under the License is distributed on an "AS IS" BASIS, |
||
11 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||
12 | # See the License for the specific language governing permissions and |
||
13 | # limitations under the License. |
||
14 | |||
15 | """ |
||
16 | .. module:: config_utils |
||
17 | :platform: Unix |
||
18 | :synopsis: Helper functions for the configurator commands. |
||
19 | |||
20 | .. moduleauthor:: Nicola Wadeson <[email protected]> |
||
21 | |||
22 | """ |
||
23 | import re |
||
24 | import sys |
||
25 | import os |
||
26 | import mmap |
||
27 | import atexit |
||
28 | import logging |
||
29 | import traceback |
||
30 | import pkgutil |
||
31 | |||
32 | import importlib.util |
||
33 | from functools import wraps |
||
34 | import savu.plugins.utils as pu |
||
35 | import savu.data.data_structures.utils as du |
||
36 | |||
37 | if os.name == "nt": |
||
38 | from . import win_readline as readline |
||
39 | else: |
||
40 | import readline |
||
41 | |||
42 | histfile = os.path.join(os.path.expanduser("~"), ".savuhist") |
||
43 | histlen = 1000 |
||
44 | logging.basicConfig(level="CRITICAL") |
||
45 | error_level = 0 |
||
46 | |||
47 | |||
48 | class DummyFile(object): |
||
49 | def write(self, x): |
||
50 | pass |
||
51 | |||
52 | |||
53 | def _redirect_stdout(): |
||
54 | save_stdout = sys.stdout |
||
55 | sys.stdout = DummyFile() |
||
56 | return save_stdout |
||
57 | |||
58 | |||
59 | def load_history_file(hfile): |
||
60 | try: |
||
61 | readline.read_history_file(hfile) |
||
62 | readline.set_history_length(histlen) |
||
63 | except IOError: |
||
64 | pass |
||
65 | atexit.register(write_history_to_file) |
||
66 | |||
67 | |||
68 | def write_history_to_file(): |
||
69 | try: |
||
70 | readline.write_history_file(histfile) |
||
71 | except IOError: |
||
72 | pass |
||
73 | |||
74 | |||
75 | def _set_readline(completer): |
||
76 | # we want to treat '/' as part of a word, so override the delimiters |
||
77 | readline.set_completer_delims(" \t\n;") |
||
78 | readline.parse_and_bind("tab: complete") |
||
79 | readline.set_completer(completer) |
||
80 | |||
81 | |||
82 | def parse_args(function): |
||
83 | @wraps(function) |
||
84 | def _parse_args_wrap_function(content, args): |
||
85 | parser = "%s_arg_parser" % function.__name__ |
||
86 | from . import arg_parsers as parsers |
||
87 | |||
88 | args = getattr(parsers, parser)(args.split(), doc=False) |
||
89 | if not args: |
||
90 | return content |
||
91 | return function(content, args) |
||
92 | |||
93 | return _parse_args_wrap_function |
||
94 | |||
95 | |||
96 | def error_catcher(function): |
||
97 | View Code Duplication | @wraps(function) |
|
|
|||
98 | def error_catcher_wrap_function(content, args): |
||
99 | try: |
||
100 | return function(content, args) |
||
101 | except Exception as e: |
||
102 | err_msg_list = str(e).split() |
||
103 | savu_error = True if len(err_msg_list) > 1 and err_msg_list[1] == 'ERROR:' else False |
||
104 | |||
105 | if error_level == 0 and savu_error: |
||
106 | print(e) |
||
107 | elif error_level == 0: |
||
108 | print(f"{type(e).__name__}: {e}") |
||
109 | elif error_level == 1: |
||
110 | traceback.print_exc(file=sys.stdout) |
||
111 | |||
112 | return content |
||
113 | |||
114 | return error_catcher_wrap_function |
||
115 | |||
116 | |||
117 | View Code Duplication | def error_catcher_savu(function): |
|
118 | @wraps(function) |
||
119 | def error_catcher_wrap_function(): |
||
120 | try: |
||
121 | return function() |
||
122 | except Exception as e: |
||
123 | err_msg_list = str(e).split() |
||
124 | savu_error = True if len(err_msg_list) > 1 and err_msg_list[1] == 'ERROR:' else False |
||
125 | |||
126 | if error_level == 0 and savu_error: |
||
127 | print(e) |
||
128 | elif error_level == 0: |
||
129 | print(f"{type(e).__name__}: {e}") |
||
130 | elif error_level == 1: |
||
131 | traceback.print_exc(file=sys.stdout) |
||
132 | |||
133 | return error_catcher_wrap_function |
||
134 | |||
135 | |||
136 | def populate_plugins(error_mode=False, examples=False): |
||
137 | # load all the plugins |
||
138 | plugins_paths = pu.get_plugins_paths(examples=examples) |
||
139 | failed_imports = {} |
||
140 | |||
141 | for path, name in plugins_paths.items(): |
||
142 | for finder, module_name, is_pkg in pkgutil.walk_packages([path], name): |
||
143 | if not is_pkg: |
||
144 | failed_imports = _load_module(finder, module_name, failed_imports, error_mode) |
||
145 | return failed_imports |
||
146 | |||
147 | |||
148 | def _load_module(finder, module_name, failed_imports, error_mode): |
||
149 | try: |
||
150 | # need to ignore loading of plugin.utils as it is emptying the list |
||
151 | spec = finder.find_spec(module_name) |
||
152 | mod = importlib.util.module_from_spec(spec) |
||
153 | sys.modules[spec.name] = mod |
||
154 | spec.loader.exec_module(mod) |
||
155 | # Load the plugin class and ensure the tools file is present |
||
156 | plugin = pu.load_class(module_name)() |
||
157 | if not plugin.get_plugin_tools(): |
||
158 | raise OSError(f"Tools file not found.") |
||
159 | except Exception as e: |
||
160 | if _is_registered_plugin(mod): |
||
161 | clazz = pu._get_cls_name(module_name) |
||
162 | failed_imports[clazz] = e |
||
163 | if error_mode: |
||
164 | print(("\nUnable to load plugin %s\n%s" % (module_name, e))) |
||
165 | return failed_imports |
||
166 | |||
167 | |||
168 | def _is_registered_plugin(mod): |
||
169 | with open(mod.__file__) as f: |
||
170 | for line in f: |
||
171 | if "@register_plugin" in line and line.replace(" ", "")[0] != "#": |
||
172 | return True |
||
173 | return False |
||
174 | |||
175 | |||
176 | def _dawn_setup(): |
||
177 | for plugin in list(pu.dawn_plugins.keys()): |
||
178 | p = pu.plugins[plugin]() |
||
179 | pu.dawn_plugins[plugin]['input rank'] = \ |
||
180 | du.get_pattern_rank(p.get_plugin_pattern()) |
||
181 | pu.dawn_plugins[plugin]['description'] = p.__doc__.split(':param')[0] |
||
182 | params = _get_dawn_parameters(p) |
||
183 | pu.dawn_plugin_params[plugin] = params |
||
184 | |||
185 | |||
186 | def _get_dawn_parameters(plugin): |
||
187 | plugin.get_plugin_tools()._populate_default_parameters() |
||
188 | desc = plugin.p_dict["description"] |
||
189 | params = {} |
||
190 | for key, value in plugin.parameters.items(): |
||
191 | if key not in ["in_datasets", "out_datasets"]: |
||
192 | params[key] = {"value": value, "hint": desc[key]} |
||
193 | return params |
||
194 | |||
195 | |||
196 | def _populate_plugin_list(content, pfilter=""): |
||
197 | """ Populate the plugin list from a list of plugin instances. """ |
||
198 | content.plugin_list.plugin_list = [] |
||
199 | sorted_plugins = __get_filtered_plugins(pfilter) |
||
200 | # Remove plugins which failed to load correctly |
||
201 | loaded_plugins = [p for p in sorted_plugins |
||
202 | if not content.plugin_in_failed_dict(p)] |
||
203 | count = 0 |
||
204 | for key in loaded_plugins: |
||
205 | content.add(key, str(count)) |
||
206 | count += 1 |
||
207 | |||
208 | |||
209 | def _search_plugin_file(module_name, pfilter): |
||
210 | """Check for string inside file""" |
||
211 | string_found = False |
||
212 | |||
213 | savu_base_path = \ |
||
214 | os.path.dirname(os.path.realpath(__file__)).split('scripts')[0] |
||
215 | file_dir = module_name.replace('.', '/') |
||
216 | file_path = savu_base_path + file_dir + '.py' |
||
217 | if os.path.isfile(file_path): |
||
218 | plugin_file = open(file_path, "r") |
||
219 | data = plugin_file.read().split() |
||
220 | if pfilter in data: |
||
221 | string_found = True |
||
222 | plugin_file.close() |
||
223 | return string_found |
||
224 | |||
225 | |||
226 | def __get_filtered_plugins(pfilter): |
||
227 | """ Get a sorted, filter list of plugins. """ |
||
228 | key_list = [] |
||
229 | star_search = \ |
||
230 | pfilter.split("*")[0] if pfilter and "*" in pfilter else False |
||
231 | |||
232 | for key, value in pu.plugins.items(): |
||
233 | if star_search: |
||
234 | search = '(?i)^' + star_search |
||
235 | if re.match(search, value.__name__) or \ |
||
236 | re.match(search, value.__module__): |
||
237 | key_list.append(key) |
||
238 | elif pfilter in value.__module__ or pfilter in value.__name__: |
||
239 | key_list.append(key) |
||
240 | else: |
||
241 | # Check if the word is present in the file |
||
242 | if _search_plugin_file(value.__module__, pfilter): |
||
243 | key_list.append(key) |
||
244 | |||
245 | key_list.sort() |
||
246 | return key_list |
||
247 |