Test Failed
Pull Request — master (#708)
by Daniil
03:33
created

scripts.config_generator.content   F

Complexity

Total Complexity 103

Size/Duplication

Total Lines 400
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 295
dl 0
loc 400
rs 2
c 0
b 0
f 0
wmc 103

33 Methods

Rating   Name   Duplication   Size   Complexity  
B Content._mutate_plugins() 0 27 8
A Content.increment_positions() 0 8 3
A Content.add() 0 12 3
A Content.inc_numbers() 0 4 2
A Content.find_position() 0 6 2
A Content.remove() 0 8 2
A Content.save() 0 8 3
A Content.check_mutations() 0 12 4
A Content.convert_to_ascii() 0 5 2
A Content._version_to_float() 0 7 3
C Content.convert_pos() 0 42 9
D Content._apply_plugin_updates() 0 40 13
A Content.size() 0 3 1
A Content.modify() 0 8 3
A Content.inc_positions() 0 8 2
A Content.on_and_off() 0 5 2
A Content.create_plugin_dict() 0 15 1
A Content.check_file() 0 8 4
A Content.fopen() 0 10 3
A Content.move() 0 9 1
A Content.set_finished() 0 2 2
A Content.get() 0 2 1
A Content.get_positions() 0 7 2
A Content.inc_letters() 0 4 2
B Content._update_parameters() 0 20 7
A Content.clear() 0 3 2
A Content.is_finished() 0 2 1
A Content.__init__() 0 8 1
A Content.insert() 0 7 2
A Content.refresh() 0 13 4
A Content.value() 0 7 3
A Content.display() 0 4 2
A Content.get_split_positions() 0 9 3

How to fix   Complexity   

Complexity

Complex classes like scripts.config_generator.content often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# Copyright 2014 Diamond Light Source Ltd.
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#     http://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14
15
"""
16
.. module:: content
17
   :platform: Unix
18
   :synopsis: Content class for the configurator
19
20
.. moduleauthor:: Nicola Wadeson <[email protected]>
21
22
"""
23
24
import re
25
import os
26
import inspect
27
28
from savu.plugins import utils as pu
29
from savu.data.plugin_list import PluginList
30
from . import mutations
31
32
33
class Content(object):
34
35
    def __init__(self, filename=None, level='user'):
36
        self.disp_level = level
37
        self.plugin_list = PluginList()
38
        self.plugin_mutations = mutations.plugin_mutations
39
        self.param_mutations = mutations.param_mutations
40
        self.filename = filename
41
        self._finished = False
42
        self.failed = {}
43
44
    def set_finished(self, check='y'):
45
        self._finished = True if check.lower() == 'y' else False
46
47
    def is_finished(self):
48
        return self._finished
49
50
    def fopen(self, infile, update=False, skip=False):
51
        if os.path.exists(infile):
52
            self.plugin_list._populate_plugin_list(infile, active_pass=True)
53
        else:
54
            raise Exception('INPUT ERROR: The file does not exist.')
55
        self.filename = infile
56
        if update:
57
            self.plugin_mutations = self.check_mutations(self.plugin_mutations)
58
            self.param_mutations = self.check_mutations(self.param_mutations)
59
            self._apply_plugin_updates(skip)
60
61
    def check_mutations(self, mut_dict: dict):
62
        plist_version = self._version_to_float(self.plugin_list.version)
63
        # deleting elements while iterating invalidates the iterator
64
        # which raises a RuntimeError in Python 3.
65
        # Instead a copy of the dict is mutated and returned
66
        mut_dict_copy = mut_dict.copy()
67
        for key, subdict in mut_dict.items():
68
            if 'up_to_version' in subdict.keys():
69
                up_to_version = self._version_to_float(subdict['up_to_version'])
70
                if plist_version >= up_to_version:
71
                    del mut_dict_copy[key]
72
        return mut_dict_copy
73
74
    def _version_to_float(self, version):
75
        if version is None:
76
            return 0
77
        if isinstance(version, bytes):
78
            version = version.decode("ascii")
79
        split_vals = version.split('.')
80
        return float('.'.join([split_vals[0], ''.join(split_vals[1:])]))
81
82
    def display(self, formatter, **kwargs):
83
        if 'level' not in list(kwargs.keys()):
84
            kwargs['level'] = self.disp_level
85
        print('\n' + formatter._get_string(**kwargs) + '\n')
86
87
    def check_file(self, filename):
88
        if not filename:
89
            raise Exception('INPUT ERROR: Please specify the output filepath.')
90
        path = os.path.dirname(filename)
91
        path = path if path else '.'
92
        if not os.path.exists(path):
93
            file_error = "INPUT_ERROR: Incorrect filepath."
94
            raise Exception(file_error)
95
96
    def save(self, filename, check='y', template=False):
97
        if check.lower() == 'y':
98
            print(f"Saving file {filename}")
99
            if template:
100
                self.plugin_list.add_template(create=True)
101
            self.plugin_list._save_plugin_list(filename)
102
        else:
103
            print("The process list has NOT been saved.")
104
105
    def clear(self, check='y'):
106
        if check.lower() == 'y':
107
            self.plugin_list.plugin_list = []
108
109
    def add(self, name, str_pos):
110
        if name not in list(pu.plugins.keys()):
111
            if name in list(self.failed.keys()):
112
                msg = "IMPORT ERROR: %s is unavailable due to the following" \
113
                      " error:\n\t%s" % (name, self.failed[name])
114
                raise Exception(msg)
115
            else:
116
                raise Exception("INPUT ERROR: Unknown plugin %s" % name)
117
        plugin = pu.plugins[name]()
118
        plugin._populate_default_parameters()
119
        pos, str_pos = self.convert_pos(str_pos)
120
        self.insert(plugin, pos, str_pos)
121
122
    def refresh(self, str_pos, defaults=False, change=False):
123
        pos = self.find_position(str_pos)
124
        plugin_entry = self.plugin_list.plugin_list[pos]
125
        name = change if change else plugin_entry['name']
126
        active = plugin_entry['active']
127
        plugin = pu.plugins[name]()
128
        plugin._populate_default_parameters()
129
130
        keep = self.get(pos)['data'] if not defaults else None
131
        self.insert(plugin, pos, str_pos, replace=True)
132
        self.plugin_list.plugin_list[pos]['active'] = active
133
        if keep:
134
            self._update_parameters(plugin, name, keep, str_pos)
135
136
    def _update_parameters(self, plugin, name, keep, str_pos):
137
        union_params = set(keep).intersection(set(plugin.parameters))
138
        for param in union_params:
139
            self.modify(str_pos, param, keep[param], ref=True)
140
        # add any parameter mutations here
141
        classes = [c.__name__ for c in inspect.getmro(plugin.__class__)]
142
        m_dict = self.param_mutations
143
        keys = [k for k in list(m_dict.keys()) if k in classes]
144
145
        changes = False
146
        for k in keys:
147
            for entry in m_dict[k]['params']:
148
                if entry['old'] in list(keep.keys()):
149
                    changes = True
150
                    val = keep[entry['old']]
151
                    if 'eval' in list(entry.keys()):
152
                        val = eval(entry['eval'])
153
                    self.modify(str_pos, entry['new'], val, ref=True)
154
        if changes:
155
            mutations.param_change_str(keep, plugin.parameters, name, keys)
156
157
    def _apply_plugin_updates(self, skip=False):
158
        # Update old process lists that start from 0
159
        the_list = self.plugin_list.plugin_list
160
        if 'pos' in list(the_list[0].keys()) and the_list[0]['pos'] == '0':
161
            self.increment_positions()
162
163
        missing = []
164
        pos = len(the_list) - 1
165
        notices = mutations.plugin_notices
166
167
        for plugin in the_list[::-1]:
168
            # update old process lists to include 'active' flag
169
            if 'active' not in list(plugin.keys()):
170
                plugin['active'] = True
171
172
            while True:
173
                name = the_list[pos]['name']
174
                if name in notices.keys():
175
                    print(notices[name]['desc'])
176
177
                # if a plugin is missing from all available plugins
178
                # then look for mutations in the plugin name
179
                search = True if name not in pu.plugins.keys() else False
180
                found = self._mutate_plugins(name, pos, search=search)
181
                if search and not found:
182
                    str_pos = self.plugin_list.plugin_list[pos]['pos']
183
                    missing.append([name, str_pos])
184
                    self.remove(pos)
185
                    pos -= 1
186
                if name == the_list[pos]['name']:
187
                    break
188
            pos -= 1
189
190
        for name, pos in missing[::-1]:
191
            if skip:
192
                print(f"Skipping plugin {pos}: {name}")
193
            else:
194
                message = f"PLUGIN ERROR: The plugin {name} is unavailable in this version of Savu."
195
                print(message)
196
                raise Exception(f'Incompatible process list. {message}')
197
198
    def _mutate_plugins(self, name, pos, search=False):
199
        """ Perform plugin mutations. """
200
        # check for case changes in plugin name
201
        if search:
202
            for key in pu.plugins.keys():
203
                if name.lower() == key.lower():
204
                    str_pos = self.plugin_list.plugin_list[pos]['pos']
205
                    self.refresh(str_pos, change=key)
206
                    return True
207
208
        # check mutations dict
209
        m_dict = self.plugin_mutations
210
        if name in m_dict.keys():
211
            mutate = m_dict[name]
212
            if 'replace' in mutate.keys():
213
                if mutate['replace'] in list(pu.plugins.keys()):
214
                    str_pos = self.plugin_list.plugin_list[pos]['pos']
215
                    self.refresh(str_pos, change=mutate['replace'])
216
                    print(mutate['desc'])
217
                    return True
218
                raise Exception(f"Replacement plugin {mutate['replace']} unavailable for {name}")
219
            elif 'remove' in mutate.keys():
220
                self.remove(pos)
221
                print(mutate['desc'])
222
            else:
223
                raise Exception('Unknown mutation type.')
224
        return False
225
226
    def move(self, old, new):
227
        old_pos = self.find_position(old)
228
        entry = self.plugin_list.plugin_list[old_pos]
229
        self.remove(old_pos)
230
        new_pos, new = self.convert_pos(new)
231
        name = entry['name']
232
        self.insert(pu.plugins[name](), new_pos, new)
233
        self.plugin_list.plugin_list[new_pos] = entry
234
        self.plugin_list.plugin_list[new_pos]['pos'] = new
235
236
    def modify(self, pos_str, subelem, value, ref=False):
237
        if not ref:
238
            value = self.value(value)
239
        pos = self.find_position(pos_str)
240
        data_elements = self.plugin_list.plugin_list[pos]['data']
241
        if subelem.isdigit():
242
            subelem = self.plugin_list.plugin_list[pos]['map'][int(subelem) - 1]
243
        data_elements[subelem] = value
244
245
    def value(self, value):
246
        if not value.count(';'):
247
            try:
248
                value = eval(value)
249
            except (NameError, SyntaxError):
250
                value = eval(f"'{value}'")
251
        return value
252
253
    def convert_to_ascii(self, value):
254
        ascii_list = []
255
        for v in value:
256
            ascii_list.append(v.encode('ascii', 'ignore'))
257
        return ascii_list
258
259
    def on_and_off(self, str_pos, index):
260
        print(("switching plugin %s %s" % (str_pos, index)))
261
        status = True if index == 'ON' else False
262
        pos = self.find_position(str_pos)
263
        self.plugin_list.plugin_list[pos]['active'] = status
264
265
    def convert_pos(self, str_pos):
266
        """ Converts the display position (input) to the equivalent numerical
267
        position and updates the display position if required.
268
269
        :param str_pos: the plugin display position (input) string.
270
        :returns: the equivalent numerical position of str_pos and and updated\
271
            str_pos.
272
        :rtype: (pos, str_pos)
273
        """
274
        pos_list = self.get_split_positions()
275
        num = re.findall(r"\d+", str_pos)[0]
276
        letter = re.findall("[a-z]", str_pos)
277
        entry = [num, letter[0]] if letter else [num]
278
279
        # full value already exists in the list
280
        if entry in pos_list:
281
            index = pos_list.index(entry)
282
            return self.inc_positions(index, pos_list, entry, 1)
283
284
        # only the number exists in the list
285
        num_list = [pos_list[i][0] for i in range(len(pos_list))]
286
        if entry[0] in num_list:
287
            start = num_list.index(entry[0])
288
            if len(entry) == 2:
289
                if len(pos_list[start]) == 2:
290
                    idx = int([i for i in range(len(num_list)) if
291
                               (num_list[i] == entry[0])][-1]) + 1
292
                    entry = [entry[0], str(chr(ord(pos_list[idx - 1][1]) + 1))]
293
                    return idx, ''.join(entry)
294
                if entry[1] == 'a':
295
                    self.plugin_list.plugin_list[start]['pos'] = entry[0] + 'b'
296
                    return start, ''.join(entry)
297
                else:
298
                    self.plugin_list.plugin_list[start]['pos'] = entry[0] + 'a'
299
                    return start + 1, entry[0] + 'b'
300
            return self.inc_positions(start, pos_list, entry, 1)
301
302
        # number not in list
303
        entry[0] = str(int(num_list[-1]) + 1 if num_list else 1)
304
        if len(entry) == 2:
305
            entry[1] = 'a'
306
        return len(self.plugin_list.plugin_list), ''.join(entry)
307
308
    def increment_positions(self):
309
        """ Update old process lists that start plugin numbering from 0 to
310
        start from 1. """
311
        for plugin in self.plugin_list.plugin_list:
312
            str_pos = plugin['pos']
313
            num = str(int(re.findall(r'\d+', str_pos)[0]) + 1)
314
            letter = re.findall("[a-z]", str_pos)
315
            plugin['pos'] = ''.join([num, letter[0]] if letter else [num])
316
317
    def get_positions(self):
318
        """ Get a list of all current plugin entry positions. """
319
        elems = self.plugin_list.plugin_list
320
        pos_list = []
321
        for e in elems:
322
            pos_list.append(e['pos'])
323
        return pos_list
324
325
    def get_split_positions(self):
326
        """ Separate numbers and letters in positions. """
327
        positions = self.get_positions()
328
        split_pos = []
329
        for i in range(len(positions)):
330
            num = re.findall(r'\d+', positions[i])[0]
331
            letter = re.findall('[a-z]', positions[i])
332
            split_pos.append([num, letter[0]] if letter else [num])
333
        return split_pos
334
335
    def find_position(self, pos):
336
        """ Find the numerical index of a position (a string). """
337
        pos_list = self.get_positions()
338
        if pos not in pos_list:
339
            raise Exception("INPUT ERROR: Incorrect plugin position.")
340
        return pos_list.index(pos)
341
342
    def inc_positions(self, start, pos_list, entry, inc):
343
        if len(entry) == 1:
344
            self.inc_numbers(start, pos_list, inc)
345
        else:
346
            idx = [i for i in range(start, len(pos_list)) if
347
                   pos_list[i][0] == entry[0]]
348
            self.inc_letters(idx, pos_list, inc)
349
        return start, ''.join(entry)
350
351
    def inc_numbers(self, start, pos_list, inc):
352
        for i in range(start, len(pos_list)):
353
            pos_list[i][0] = str(int(pos_list[i][0]) + inc)
354
            self.plugin_list.plugin_list[i]['pos'] = ''.join(pos_list[i])
355
356
    def inc_letters(self, idx, pos_list, inc):
357
        for i in idx:
358
            pos_list[i][1] = str(chr(ord(pos_list[i][1]) + inc))
359
            self.plugin_list.plugin_list[i]['pos'] = ''.join(pos_list[i])
360
361
    def insert(self, plugin, pos, str_pos, replace=False):
362
        plugin_dict = self.create_plugin_dict(plugin)
363
        plugin_dict['pos'] = str_pos
364
        if replace:
365
            self.plugin_list.plugin_list[pos] = plugin_dict
366
        else:
367
            self.plugin_list.plugin_list.insert(pos, plugin_dict)
368
369
    def create_plugin_dict(self, plugin):
370
        plugin_dict = {}
371
        plugin_dict['name'] = plugin.name
372
        plugin_dict['id'] = plugin.__module__
373
        plugin_dict['data'] = plugin.parameters
374
        plugin_dict['active'] = True
375
        plugin_dict['desc'] = plugin.parameters_desc
376
        plugin_dict['hide'] = plugin.parameters_hide
377
        plugin_dict['user'] = plugin.parameters_user
378
379
        dev_keys = [k for k in list(plugin_dict['data'].keys()) if k not in
380
                    plugin_dict['user'] + plugin_dict['hide']]
381
        plugin_dict['map'] = \
382
            plugin_dict['user'] + dev_keys + plugin_dict['hide']
383
        return plugin_dict
384
385
    def get(self, pos):
386
        return self.plugin_list.plugin_list[pos]
387
388
    def remove(self, pos):
389
        if pos >= self.size:
390
            raise Exception("Cannot remove plugin %s as it does not exist."
391
                            % self.plugin_list.plugin_list[pos]['name'])
392
        pos_str = self.plugin_list.plugin_list[pos]['pos']
393
        self.plugin_list.plugin_list.pop(pos)
394
        pos_list = self.get_split_positions()
395
        self.inc_positions(pos, pos_list, pos_str, -1)
396
397
    @property
398
    def size(self):
399
        return len(self.plugin_list.plugin_list)
400