Test Failed
Pull Request — master (#929)
by
unknown
04:21
created

Content.get_param_arg_str()   A

Complexity

Conditions 1

Size

Total Lines 16
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 8
nop 3
dl 0
loc 16
rs 10
c 0
b 0
f 0
1
# Copyright 2014 Diamond Light Source Ltd.
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#     http://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14
15
"""
16
.. module:: content
17
   :platform: Unix
18
   :synopsis: Content class for the configurator
19
20
.. moduleauthor:: Nicola Wadeson <[email protected]>
21
22
"""
23
import re
24
import os
25
import copy
26
import inspect
27
28
from savu.plugins import utils as pu
29
from savu.data.plugin_list import PluginList
30
import scripts.config_generator.parameter_utils as param_u
31
32
from . import mutations
33
34
35
class Content(object):
36
    def __init__(self, filename=None, level="basic"):
37
        self.disp_level = level
38
        self.plugin_list = PluginList()
39
        self.plugin_mutations = mutations.plugin_mutations
40
        self.param_mutations = mutations.param_mutations
41
        self.filename = filename
42
        self._finished = False
43
        self.failed = {}
44
        self.expand_dim = None
45
46
    def set_finished(self, check="y"):
47
        self._finished = True if check.lower() == "y" else False
48
49
    def is_finished(self):
50
        return self._finished
51
52
    def fopen(self, infile, update=False, skip=False):
53
        if os.path.exists(infile):
54
            self.plugin_list._populate_plugin_list(infile, active_pass=True)
55
        else:
56
            raise Exception("INPUT ERROR: The file does not exist.")
57
        self.filename = infile
58
        if update:
59
            self.plugin_mutations = self.check_mutations(
60
                self.plugin_mutations
61
            )
62
            self.param_mutations = self.check_mutations(self.param_mutations)
63
            self._apply_plugin_updates(skip)
64
65
    def check_mutations(self, mut_dict: dict):
66
        plist_version = self._version_to_float(self.plugin_list.version)
67
        # deleting elements while iterating invalidates the iterator
68
        # which raises a RuntimeError in Python 3.
69
        # Instead a copy of the dict is mutated and returned
70
        mut_dict_copy = mut_dict.copy()
71
        for key, subdict in mut_dict.items():
72
            if "up_to_version" in subdict.keys():
73
                up_to_version = self._version_to_float(
74
                    subdict["up_to_version"]
75
                )
76
                if plist_version >= up_to_version:
77
                    del mut_dict_copy[key]
78
        return mut_dict_copy
79
80
    def _version_to_float(self, version):
81
        if version is None:
82
            return 0
83
        if isinstance(version, bytes):
84
            version = version.decode("ascii")
85
        split_vals = version.split(".")
86
        return float(".".join([split_vals[0], "".join(split_vals[1:])]))
87
88
    def display(self, formatter, **kwargs):
89
        # Set current level
90
        if "current_level" not in list(kwargs.keys()):
91
            kwargs["current_level"] = self.disp_level
92
        if (
93
            "disp_level" in list(kwargs.keys())
94
            and kwargs["disp_level"] is True
95
        ):
96
            # Display level
97
            print(f"Level is set at '{kwargs['current_level']}'")
98
        else:
99
            # Display parameter
100
            kwargs["expand_dim"] = self.expand_dim
101
            print("\n" + formatter._get_string(**kwargs) + "\n")
102
103
    def check_file(self, filename):
104
        if not filename:
105
            raise Exception(
106
                "INPUT ERROR: Please specify the output filepath."
107
            )
108
        path = os.path.dirname(filename)
109
        path = path if path else "."
110
        if not os.path.exists(path):
111
            file_error = "INPUT_ERROR: Incorrect filepath."
112
            raise Exception(file_error)
113
114
    def save(self, filename, check="y", template=False):
115
        self.check_plugin_list_exists()
116
        # Check if a loader and saver are present.
117
        self.plugin_list._check_loaders()
118
        if check.lower() == "y":
119
            print(f"Saving file {filename}")
120
            self.filename = filename
121
            if template:
122
                self.plugin_list.add_template(create=True)
123
            self.plugin_list._save_plugin_list(filename)
124
        else:
125
            print("The process list has NOT been saved.")
126
127
    def clear(self, check="y"):
128
        if check.lower() == "y":
129
            self.expand_dim = None
130
            self.filename = None
131
            self.plugin_list.plugin_list = []
132
            self.plugin_list.clear_iterate_plugin_group_dicts()
133
134
    def check_plugin_list_exists(self):
135
        """ Check if plugin list is populated. """
136
        pos_list = self.get_positions()
137
        if not pos_list:
138
            print("There are no items to access in your list.")
139
            raise Exception("Please add an item to the process list.")
140
141
    def add(self, name, str_pos):
142
        self.check_for_plugin_failure(name)
143
        plugin = pu.plugins[name]()
144
        plugin.get_plugin_tools()._populate_default_parameters()
145
        pos, str_pos = self.convert_pos(str_pos)
146
        self.insert(plugin, pos, str_pos)
147
148
    def iterate(self, args):
149
        if args.remove is not None:
150
            self.plugin_list.remove_iterate_plugin_groups(args.remove)
151
        elif args.set is not None:
152
            # create a dict representing a group of plugins to iterate over
153
            start = args.set[0]
154
            end = args.set[1]
155
            iterations = args.set[2]
156
            self.plugin_list.add_iterate_plugin_group(start, end, iterations)
157
158
    def refresh(self, str_pos, defaults=False, change=False):
159
        pos = self.find_position(str_pos)
160
        plugin_entry = self.plugin_list.plugin_list[pos]
161
        name = change if change else plugin_entry["name"]
162
        active = plugin_entry["active"]
163
        plugin = pu.plugins[name]()
164
        plugin.get_plugin_tools()._populate_default_parameters()
165
        keep = self.get(pos)["data"] if not defaults else None
166
        self.insert(plugin, pos, str_pos, replace=True)
167
        self.plugin_list.plugin_list[pos]["active"] = active
168
        if keep:
169
            self._update_parameters(plugin, name, keep, str_pos)
170
171
    def duplicate(self, dupl_pos, new):
172
        """ Duplicate the plugin at position dupl_pos
173
        and insert it at the new position
174
175
        :param dupl_pos: Position of the plugin to duplicate
176
        :param new: New plugin position
177
        """
178
        pos = self.find_position(dupl_pos)
179
        new_pos, new_pos_str = self.convert_pos(new)
180
        plugin_entry = copy.deepcopy(self.plugin_list.plugin_list[pos])
181
        plugin_entry["pos"] = new_pos_str
182
        self.plugin_list.plugin_list.insert(new_pos, plugin_entry)
183
184
    def check_for_plugin_failure(self, name):
185
        """Check if the plugin failed to load
186
187
        :param name: plugin name
188
        """
189
        if (name not in list(pu.plugins.keys())
190
                or self.plugin_in_failed_dict(name)):
191
                if self.plugin_in_failed_dict(name):
192
                    msg = f"IMPORT ERROR: {name} is unavailable due to" \
193
                          f" the following error:\n\t{self.failed[name]}"
194
                    raise Exception(msg)
195
                raise Exception("INPUT ERROR: Unknown plugin %s" % name)
196
197
    def plugin_in_failed_dict(self, name):
198
        """Check if plugin in failed dictionary
199
200
        :param name: plugin name
201
        :return: True if plugin name in the list of failed plugins
202
        """
203
        failed_plugin_list = list(self.failed.keys()) if self.failed else []
204
        return True if name in failed_plugin_list else False
205
206
    def check_preview_param(self, plugin_pos):
207
        """ Check that the plugin position number is valid and it contains
208
        a preview parameter
209
210
        :param plugin_pos:
211
        :return:
212
        """
213
        pos = self.find_position(plugin_pos)
214
        plugin_entry = self.plugin_list.plugin_list[pos]
215
        parameters = plugin_entry["data"]
216
        if "preview" not in parameters:
217
            raise Exception("You can only use this command with "
218
                            "plugins containing the preview parameter")
219
220
    def set_preview_display(self, expand_off, expand_dim, dim_view,
221
                            plugin_pos):
222
        """Set the dimensions_to_display value to "off" to prevent the
223
        preview parameter being shown in it's expanded form.
224
225
        If dimensions_to_display = "all", then all dimension slices are shown.
226
        If a number is provided to dim_view, that dimension is shown.
227
228
        :param expand_off: True if expand display should be turned off
229
        :param expand_dim: The dimension number to display, or "all"
230
        :param dim_view: True if only a certain dimension should be shown
231
        :param plugin_pos: Plugin position
232
        """
233
        if expand_off is True:
234
            self.expand_dim = None
235
            print(f"Expand diplay has been turned off")
236
        else:
237
            self.check_preview_param(plugin_pos)
238
            dims_to_display = expand_dim if dim_view else "all"
239
            self.expand_dim = dims_to_display
240
241
    def _update_parameters(self, plugin, name, keep, str_pos):
242
        union_params = set(keep).intersection(set(plugin.parameters))
243
        # Names of the parameter names present in both lists
244
        for param in union_params:
245
            valid_mod = self.modify(str_pos, param, keep[param], ref=True)
246
            if not valid_mod:
247
                self._print_default_message(plugin, param)
248
        # add any parameter mutations here
249
        classes = [c.__name__ for c in inspect.getmro(plugin.__class__)]
250
        m_dict = self.param_mutations
251
        keys = [k for k in list(m_dict.keys()) if k in classes]
252
253
        changes = False
254
        for k in keys:
255
            for entry in m_dict[k]["params"]:
256
                if entry["old"] in list(keep.keys()):
257
                    changes = True
258
                    val = keep[entry["old"]]
259
                    if "eval" in list(entry.keys()):
260
                        val = eval(entry["eval"])
261
                    self.modify(str_pos, entry["new"], val, ref=True)
262
        if changes:
263
            mutations.param_change_str(keep, plugin.parameters, name, keys)
264
265
    def _print_default_message(self, plugin, param):
266
        """Print message to alert user that value has not been changed/updated
267
268
        :param plugin: plugin object
269
        :param param: parameter name
270
        """
271
        pdefs = plugin.get_plugin_tools().get_param_definitions()
272
        default = pdefs[param]['default']
273
        print(f"This has been replaced by the default {param}: "
274
              f"{default}.")
275
276
    def _apply_plugin_updates(self, skip=False):
277
        # Update old process lists that start from 0
278
        the_list = self.plugin_list.plugin_list
279
        if "pos" in list(the_list[0].keys()) and the_list[0]["pos"] == "0":
280
            self.increment_positions()
281
282
        missing = []
283
        pos = len(the_list) - 1
284
        notices = mutations.plugin_notices
285
286
        for plugin in the_list[::-1]:
287
            # update old process lists to include 'active' flag
288
            if "active" not in list(plugin.keys()):
289
                plugin["active"] = True
290
291
            while True:
292
                name = the_list[pos]["name"]
293
                if name in notices.keys():
294
                    print(notices[name]["desc"])
295
296
                # if a plugin is missing from all available plugins
297
                # then look for mutations in the plugin name
298
                search = True if name not in pu.plugins.keys() else False
299
                found = self._mutate_plugins(name, pos, search=search)
300
                if search and not found:
301
                    str_pos = self.plugin_list.plugin_list[pos]["pos"]
302
                    missing.append([name, str_pos])
303
                    self.remove(pos)
304
                    pos -= 1
305
                if name == the_list[pos]["name"]:
306
                    break
307
            pos -= 1
308
309
        for name, pos in missing[::-1]:
310
            if skip:
311
                print(f"Skipping plugin {pos}: {name}")
312
            else:
313
                message = (
314
                    f"\nPLUGIN ERROR: The plugin {name} is "
315
                    f"unavailable in this version of Savu. \nThe plugin is "
316
                    f"missing from the position {pos} in the process list. "
317
                    f"\n Type open -s <process_list> to skip the broken "
318
                    f"plugin."
319
                )
320
                raise Exception(f"Incompatible process list. {message}")
321
322
    def _mutate_plugins(self, name, pos, search=False):
323
        """ Perform plugin mutations. """
324
        # check for case changes in plugin name
325
        if search:
326
            for key in pu.plugins.keys():
327
                if name.lower() == key.lower():
328
                    str_pos = self.plugin_list.plugin_list[pos]["pos"]
329
                    self.refresh(str_pos, change=key)
330
                    return True
331
332
        # check mutations dict
333
        m_dict = self.plugin_mutations
334
        if name in m_dict.keys():
335
            mutate = m_dict[name]
336
            if "replace" in mutate.keys():
337
                if mutate["replace"] in list(pu.plugins.keys()):
338
                    str_pos = self.plugin_list.plugin_list[pos]["pos"]
339
                    self.refresh(str_pos, change=mutate["replace"])
340
                    print(mutate["desc"])
341
                    return True
342
                raise Exception(
343
                    f"Replacement plugin {mutate['replace']} "
344
                    f"unavailable for {name}"
345
                )
346
            elif "remove" in mutate.keys():
347
                self.remove(pos)
348
                print(mutate["desc"])
349
            else:
350
                raise Exception("Unknown mutation type.")
351
        return False
352
353
    def move(self, old, new):
354
        old_pos = self.find_position(old)
355
        n_pos = self.find_position(new)
356
        entry = self.plugin_list.plugin_list[old_pos]
357
        self.remove(old)
358
        new_pos, new_pos_str = self.convert_pos(new)
359
        name = entry["name"]
360
        self.insert(pu.plugins[name](), new_pos, new_pos_str)
361
        self.plugin_list.plugin_list[new_pos] = entry
362
        self.plugin_list.plugin_list[new_pos]["pos"] = new_pos_str
363
        self.check_iterative_loops([old_pos + 1, new_pos + 1], 0)
364
365
    def replace(self, old, new_plugin):
366
        self.check_for_plugin_failure(new_plugin)
367
        self.remove(old)
368
        pos, str_pos = self.convert_pos(old)
369
        plugin = pu.plugins[new_plugin]()
370
        plugin.get_plugin_tools()._populate_default_parameters()
371
        self.insert(plugin, pos, str_pos)
372
        #self.plugin_list.plugin_list[pos]["pos"] = str_pos
373
374
    def modify(self, pos_str, param_name, value, default=False, ref=False,
375
               dim=False):
376
        """Modify the plugin at pos_str and the parameter at param_name
377
        The new value will be set if it is valid.
378
379
        :param pos_str: The plugin position
380
        :param param_name: The parameter position/name
381
        :param value: The new parameter value
382
        :param default: True if value should be reverted to the default
383
        :param ref: boolean Refresh the plugin
384
        :param dim: The dimension to be modified
385
386
        returns: A boolean True if the value is a valid input for the
387
          selected parameter
388
        """
389
        pos = self.find_position(pos_str)
390
        plugin_entry = self.plugin_list.plugin_list[pos]
391
        tools = plugin_entry["tools"]
392
        parameters = plugin_entry["data"]
393
        params = plugin_entry["param"]
394
        param_name, value = self.setup_modify(params, param_name, value, ref)
395
        default_str = ["-d", "--default"]
396
        if default or value in default_str:
397
            value = tools.get_default_value(param_name)
398
            self._change_value(param_name, value, tools, parameters)
399
            valid_modification = True
400
        else:
401
            value = self._catch_parameter_tuning_syntax(value, param_name)
402
            valid_modification = self.modify_main(
403
                param_name, value, tools, parameters, dim, pos_str
404
            )
405
        return valid_modification
406
407
    def _catch_parameter_tuning_syntax(self, value, param_name):
408
        """Check if the new parameter value seems like it is written
409
        in parameter tuning syntax with colons. If it is, then append
410
        a semi colon onto the end.
411
412
        :param value: new parameter value
413
        :param param_name:
414
        :return: modified value with semi colon appended
415
        """
416
        exempt_parameters = ["preview", "indices"]
417
        if self._is_multi_parameter_syntax(value) \
418
                and param_name not in exempt_parameters:
419
            # Assume parameter tuning syntax is being used
420
            value = f"{value};"
421
            print("Parameter tuning syntax applied")
422
        return value
423
424
    def _is_multi_parameter_syntax(self, value):
425
        """If the value contains two colons, is not a dictionary,
426
        and doesnt already contain a semi colon, then assume that
427
        it is using parameter tuning syntax
428
429
        :param value: new parameter value
430
        :return boolean True if parameter tuning syntax is being used
431
        """
432
        isdict = re.findall(r"[\{\}]+", str(value))
433
        return (isinstance(value, str)
434
                and value.count(":") >= 2
435
                and not isdict
436
                and ";" not in value)
437
438
    def setup_modify(self, params, param_name, value, ref):
439
        """Get the parameter keys in the correct order and find
440
        the parameter name string
441
442
        :param params: The plugin parameters
443
        :param param_name: The parameter position/name
444
        :param value: The new parameter value
445
        :param ref: boolean Refresh the plugin
446
447
        return: param_name str to avoid discrepancy, value
448
        """
449
        if ref:
450
            # For a refresh, refresh all keys, including those with
451
            # dependencies (which have the display off)
452
            keys = params.keys()
453
        else:
454
            # Select the correct group and order of parameters according to that
455
            # on display to the user. This ensures correct parameter is modified.
456
            keys = pu.set_order_by_visibility(params)
457
            value = self.value(value)
458
        param_name = pu.param_to_str(param_name, keys)
459
        return param_name, value
460
461
    def modify_main(self, param_name, value, tools, parameters, dim, pos_str):
462
        """Check the parameter is within the current parameter list.
463
        Check the new parameter value is valid, modify the parameter
464
        value, update defaults, check if dependent parameters should
465
        be made visible or hidden.
466
467
        :param param_name: The parameter position/name
468
        :param value: The new parameter value
469
        :param tools: The plugin tools
470
        :param parameters: The plugin parameters
471
        :param dim: The dimensions
472
        :param pos_str: The plugin position number
473
474
        returns: A boolean True if the value is a valid input for the
475
          selected parameter
476
        """
477
        parameter_valid = False
478
        current_parameter_details = tools.param.get(param_name)
479
        current_plugin_name = tools.plugin_class.name
480
481
        # If dimensions are provided then alter preview param
482
        if self.preview_dimension_to_modify(dim, param_name):
483
            # Filter the dimension, dim1 or dim1.start
484
            dim, _slice = self._separate_dimension_and_slice(dim)
485
            value = self.modify_preview(
486
                parameters, param_name, value, dim, _slice
487
            )
488
489
        # If found, then the parameter is within the current parameter list
490
        # displayed to the user
491
        if current_parameter_details:
492
            value_check = pu._dumps(value)
493
            parameter_valid, error_str = param_u.is_valid(
494
                param_name, value_check, current_parameter_details
495
            )
496
            if parameter_valid:
497
                self._change_value(param_name, value, tools, parameters)
498
            else:
499
                print(f"ERROR: The value {value} for "
500
                      f"{pos_str}.{param_name} is not correct.")
501
                print(error_str)
502
        else:
503
            print("Not in parameter keys.")
504
        return parameter_valid
505
506
    def _change_value(self, param_name, value, tools, parameters):
507
        """ Change the parameter "param_name" value inside the parameters list
508
        Update feedback messages for various dependant parameters
509
510
        :param param_name: The parameter position/name
511
        :param value: The new parameter value
512
        :param tools: The plugin tools
513
        :param parameters: The plugin parameters
514
        """
515
        # Save the value
516
        parameters[param_name] = value
517
        tools.warn_dependents(param_name, value)
518
        # Update the list of parameters to hide those dependent on others
519
        tools.check_dependencies(parameters)
520
521
    def check_required_args(self, value, required):
522
        """Check required argument 'value' is present
523
524
        :param value: Argument value
525
        :param required: bool, True if the argument is required
526
        """
527
        if required and (not value):
528
            raise Exception("Please enter a value")
529
530
        if (not required) and value:
531
            raise Exception(f"Unrecognised argument: {value}")
532
533
    def preview_dimension_to_modify(self, dim, param_name):
534
        """Check that the dimension string is only present when the parameter
535
        to modify is the preview parameter
536
537
        :param dim: Dimension string
538
        :param param_name: The parameter name (of the parameter to be modified)
539
        :return: True if dimension string is provided and the parameter to modify
540
        the preview parameter
541
        """
542
        if dim:
543
            if param_name == "preview":
544
                return True
545
            else:
546
                raise Exception(
547
                    "Please only use the dimension syntax when "
548
                    "modifying the preview parameter."
549
                )
550
        return False
551
552
    def modify_dimensions(self, pos_str, dim, check="y"):
553
        """Modify the plugin preview value. Remove or add dimensions
554
        to the preview parameter until the provided dimension number
555
        is reached.
556
557
        :param pos_str: The plugin position
558
        :param dim: The new number of dimensions
559
        :return True if preview is modified
560
        """
561
        pos = self.find_position(pos_str)
562
        plugin_entry = self.plugin_list.plugin_list[pos]
563
        parameters = plugin_entry["data"]
564
        self.check_param_exists(parameters, "preview")
565
        current_prev_list = pu._dumps(parameters["preview"])
566
        if not isinstance(current_prev_list, list):
567
            # Temporarily cover dict instance for preview
568
            print("This command is only possible for preview "
569
                  "values of the type list")
570
            return False
571
        pu.check_valid_dimension(dim, [])
572
        if check.lower() == "y":
573
            while len(current_prev_list) > dim:
574
                current_prev_list.pop()
575
            while len(current_prev_list) < dim:
576
                current_prev_list.append(":")
577
            parameters["preview"] = current_prev_list
578
            return True
579
        return False
580
581
    def check_param_exists(self, parameters, pname):
582
        """Check the parameter is present in the current parameter list
583
584
        :param parameters: Dictionary of parameters
585
        :param pname: Parameter name
586
        :return:
587
        """
588
        if not parameters.get(pname):
589
            raise Exception(
590
                f"The {pname} parameter is not available" f" for this plugin."
591
            )
592
593
594
    def plugin_to_num(self, plugin_val, pl_index):
595
        """Check the plugin is within the process list and
596
        return the number in the list.
597
598
        :param plugin_val: The dictionary of parameters
599
        :param pl_index: The plugin index (for use when there are multiple
600
           plugins of same name)
601
        :return: A plugin index number of a certain plugin in the process list
602
        """
603
        if plugin_val.isdigit():
604
            return plugin_val
605
        pl_names = [pl["name"] for pl in self.plugin_list.plugin_list]
606
        if plugin_val in pl_names:
607
            # Find the plugin number
608
            pl_indexes = [i for i, p in enumerate(pl_names) if p == plugin_val]
609
            # Subtract one to access correct list index. Add one to access
610
            # correct plugin position
611
            return str(pl_indexes[pl_index-1] +1)
612
        else:
613
            raise Exception("This plugin is not present in this process list.")
614
615
616
    def value(self, value):
617
        if not value.count(";"):
618
            try:
619
                value = eval(value)
620
            except (NameError, SyntaxError):
621
                try:
622
                    value = eval(f"'{value}'")
623
                    # if there is one quotation mark there will be an error
624
                except EOFError:
625
                    raise EOFError(
626
                        "There is an end of line error. Please check your"
627
                        ' input for the character "\'".'
628
                    )
629
                except SyntaxError:
630
                    raise SyntaxError(
631
                        "There is a syntax error. Please check your input."
632
                    )
633
                except:
634
                    raise Exception("Please check your input.")
635
        return value
636
637
    def convert_to_ascii(self, value):
638
        ascii_list = []
639
        for v in value:
640
            ascii_list.append(v.encode("ascii", "ignore"))
641
        return ascii_list
642
643
    def on_and_off(self, str_pos, index):
644
        print(("switching plugin %s %s" % (str_pos, index)))
645
        status = True if index == "ON" else False
646
        pos = self.find_position(str_pos)
647
        self.plugin_list.plugin_list[pos]["active"] = status
648
649
    def split_pos_to_list(self, str_pos):
650
        """Split the position string into a list of numbers and letters
651
        :param str_pos: the plugin display position string.
652
        :returns: a list with the position number and letter
653
        """
654
        num = re.findall(r"\d+", str_pos)[0]
655
        letter = re.findall("[a-z]", str_pos)
656
        entry = [num, letter[0]] if letter else [num]
657
        return entry
658
659
    def convert_pos(self, str_pos):
660
        """ Converts the display position (input) to the equivalent numerical
661
        position and updates the display position if required.
662
663
        :param str_pos: the plugin display position (input) string.
664
        :returns: the equivalent numerical position of str_pos and and updated\
665
            str_pos.
666
        :rtype: (pos, str_pos)
667
        """
668
        pos_list = self.get_split_positions()
669
        entry = self.split_pos_to_list(str_pos)
670
671
        # full value already exists in the list
672
        if entry in pos_list:
673
            index = pos_list.index(entry)
674
            return self.inc_positions(index, pos_list, entry, 1)
675
676
        # only the number exists in the list
677
        num_list = [pos_list[i][0] for i in range(len(pos_list))]
678
        if entry[0] in num_list:
679
            start = num_list.index(entry[0])
680
            if len(entry) == 2:
681
                if len(pos_list[start]) == 2:
682
                    idx = int([i for i in range(len(num_list)) if
683
                               (num_list[i] == entry[0])][-1]) + 1
684
                    entry = [entry[0], str(chr(ord(pos_list[idx - 1][1]) + 1))]
685
                    return idx, ''.join(entry)
686
                if entry[1] == 'a':
687
                    self.plugin_list.plugin_list[start]['pos'] = entry[0] + 'b'
688
                    return start, ''.join(entry)
689
                else:
690
                    self.plugin_list.plugin_list[start]['pos'] = entry[0] + 'a'
691
                    return start + 1, entry[0] + 'b'
692
            return self.inc_positions(start, pos_list, entry, 1)
693
694
        # number not in list
695
        entry[0] = str(int(num_list[-1]) + 1 if num_list else 1)
696
        if len(entry) == 2:
697
            entry[1] = "a"
698
        return len(self.plugin_list.plugin_list), "".join(entry)
699
700
    def increment_positions(self):
701
        """Update old process lists that start plugin numbering from 0 to
702
        start from 1."""
703
        for plugin in self.plugin_list.plugin_list:
704
            str_pos = plugin["pos"]
705
            num = str(int(re.findall(r"\d+", str_pos)[0]) + 1)
706
            letter = re.findall("[a-z]", str_pos)
707
            plugin["pos"] = "".join([num, letter[0]] if letter else [num])
708
709
    def get_positions(self):
710
        """ Get a list of all current plugin entry positions. """
711
        elems = self.plugin_list.plugin_list
712
        pos_list = []
713
        for e in elems:
714
            pos_list.append(e["pos"])
715
        return pos_list
716
717
    def get_param_arg_str(self, pos_str, subelem):
718
        """Get the name of the parameter so that the display lists the
719
        correct item when the parameter order has been updated
720
721
        :param pos_str: The plugin position
722
        :param subelem: The parameter
723
        :return: The plugin.parameter_name argument
724
        """
725
        pos = self.find_position(pos_str)
726
        current_parameter_list = self.plugin_list.plugin_list[pos]["param"]
727
        current_parameter_list_ordered = pu.set_order_by_visibility(
728
            current_parameter_list
729
        )
730
        param_name = pu.param_to_str(subelem, current_parameter_list_ordered)
731
        param_argument = pos_str + "." + param_name
732
        return param_argument
733
734
    def get_split_positions(self):
735
        """ Separate numbers and letters in positions. """
736
        positions = self.get_positions()
737
        split_pos = []
738
        for i in range(len(positions)):
739
            num = re.findall(r"\d+", positions[i])[0]
740
            letter = re.findall("[a-z]", positions[i])
741
            split_pos.append([num, letter[0]] if letter else [num])
742
        return split_pos
743
744
    def find_position(self, pos):
745
        """ Find the numerical index of a position (a string). """
746
        pos_list = self.get_positions()
747
        if not pos_list:
748
            print("There are no items to access in your list.")
749
            raise Exception("Please add an item to the process list.")
750
        else:
751
            if pos not in pos_list:
752
                raise ValueError("Incorrect plugin position.")
753
            return pos_list.index(pos)
754
755
    def inc_positions(self, start, pos_list, entry, inc):
756
        """ Increment plugin positions following index start, by provided inc
757
        :param start: Plugin starting index
758
        :param pos_list: plugin position list
759
        :param entry: entry list
760
        :param inc: increment value
761
        """
762
        if len(entry) == 1:
763
            self.inc_numbers(start, pos_list, inc)
764
        else:
765
            idx = [
766
                i
767
                for i in range(start, len(pos_list))
768
                if pos_list[i][0] == entry[0]
769
            ]
770
            self.inc_letters(idx, pos_list, inc)
771
        return start, "".join(entry)
772
773
    def inc_numbers(self, start, pos_list, inc):
774
        for i in range(start, len(pos_list)):
775
            pos_list[i][0] = str(int(pos_list[i][0]) + inc)
776
            self.plugin_list.plugin_list[i]["pos"] = "".join(pos_list[i])
777
778
    def inc_letters(self, idx, pos_list, inc):
779
        for i in idx:
780
            pos_list[i][1] = str(chr(ord(pos_list[i][1]) + inc))
781
            self.plugin_list.plugin_list[i]["pos"] = "".join(pos_list[i])
782
783
    def split_plugin_string(self, start, stop, subelem_view=False):
784
        """Find the start and stop number for the plugin range selected.
785
786
        :param start: Plugin starting index (including a subelem value
787
          if permitted)
788
        :param stop: Plugin stopping index
789
        :param subelem_view: False if subelem value not permitted
790
        :return: range_dict containing start stop (and possible subelem)
791
        """
792
        range_dict = {}
793
        if start:
794
            if subelem_view and "." in start:
795
                start, stop, subelem = self._split_subelem(start)
796
                range_dict["subelem"] = subelem
797
            else:
798
                start, stop = self._get_start_stop(start, stop)
799
            range_dict["start"] = start
800
            range_dict["stop"] = stop
801
        return range_dict
802
803
    def _get_start_stop(self, start, stop):
804
        """Find the start and stop number for the plugin range selected """
805
        start = self.find_position(start)
806
        stop = self.find_position(stop) + 1 if stop else start + 1
807
        return start, stop
808
809
    def _split_subelem(self, start, config_disp=True):
810
        """Separate the start string containing the plugin number,
811
        parameter(subelement), dimension and command
812
813
        :param start: The plugin to start at
814
        :param config_disp: True if command and dimension arguments
815
          are not permitted
816
        :return: start plugin, range_dict containing a subelem
817
            if a parameter is specified
818
        """
819
        start, subelem, dim, command = self.separate_plugin_subelem(
820
            start, config_disp
821
        )
822
        start, stop = self._get_start_stop(start, "")
823
        return start, stop, subelem
824
825
    def _check_command_valid(self, plugin_param, config_disp):
826
        """Check the plugin_param string length
827
828
        :param plugin_param: The string containing plugin number, parameter,
829
         and command
830
        :param config_disp: bool, True if command and dimension arguments are
831
          not permitted
832
        """
833
        if config_disp:
834
            if not 1 < len(plugin_param) < 3:
835
                raise ValueError(
836
                    "Use either 'plugin_pos.param_name' or"
837
                    " 'plugin_pos.param_no'"
838
                )
839
        else:
840
            # The modify command is being used
841
            if len(plugin_param) <= 1:
842
                raise ValueError(
843
                    "Please enter the plugin parameter to modify"
844
                    ". Either 'plugin_pos.param_name' or"
845
                    " 'plugin_pos.param_no'"
846
                )
847
            if not 1 < len(plugin_param) < 5:
848
                raise ValueError(
849
                    "Enter 'plugin_pos.param_no.dimension'. "
850
                    "Following the dimension, use start/stop/step"
851
                    " eg. '1.1.dim1.start' "
852
                )
853
854
    def separate_plugin_subelem(self, plugin_param, config_disp):
855
        """Separate the plugin number,parameter (subelement) number
856
        and additional command if present.
857
858
        :param plugin_param: A string supplied by the user input which
859
         contains the plugin element to edit/display. eg "1.1.dim.command"
860
        :param config_disp: bool, True if command and dimension arguments are
861
          not permitted
862
863
        :returns plugin: The number of the plugin element
864
                 subelem: The number of the parameter
865
                 dim: The dimension
866
                 command: The supplied command, eg expand or a dimension
867
                          string
868
        """
869
        plugin_param = plugin_param.split(".")
870
        plugin = plugin_param[0]
871
        # change str plugin name to a number
872
        start = self.find_position(plugin)
873
        self._check_command_valid(plugin_param, config_disp)
874
        subelem = plugin_param[1]
875
        if len(plugin_param) > 2:
876
            dim = self.dim_str_to_int(plugin_param[2])
877
            command = str(dim)
878
            if len(plugin_param) == 4:
879
                self._check_command_str(plugin_param[3])
880
                command += "." + plugin_param[3]
881
        else:
882
            dim, command = "", ""
883
        return plugin, subelem, dim, command
884
885
    def _check_command_str(self, command_str):
886
        """Check the additional 1.1.dim.command for slice or 'expand'
887
        keywords
888
        """
889
        command_list = ["expand", "start", "stop", "step", "chunk"]
890
        if command_str not in command_list:
891
            raise ValueError(
892
                "Following the dimension, use start/stop/step eg.  "
893
                "'1.1.dim1.start' "
894
            )
895
        return command_str
896
897
    def insert(self, plugin, pos, str_pos, replace=False):
898
        plugin_dict = self.create_plugin_dict(plugin)
899
        plugin_dict["pos"] = str_pos
900
        if replace:
901
            self.plugin_list.plugin_list[pos] = plugin_dict
902
        else:
903
            self.plugin_list.plugin_list.insert(pos, plugin_dict)
904
905
    def create_plugin_dict(self, plugin):
906
        tools = plugin.get_plugin_tools()
907
        plugin_dict = {}
908
        plugin_dict["name"] = plugin.name
909
        plugin_dict["id"] = plugin.__module__
910
        plugin_dict["data"] = plugin.parameters
911
        plugin_dict["active"] = True
912
        plugin_dict["tools"] = tools
913
        plugin_dict["param"] = tools.get_param_definitions()
914
        plugin_dict["doc"] = tools.docstring_info
915
        return plugin_dict
916
917
    def get(self, pos):
918
        return self.plugin_list.plugin_list[pos]
919
920
    def _separate_dimension_and_slice(self, command_str):
921
        """Check the start stop step command
922
923
        param command_str: a string '1.1' containing the dimension
924
            and slice number seperated by a full stop
925
926
        :returns dim, slice
927
        """
928
        if isinstance(command_str, str) and "." in command_str:
929
            # If the slice value is included
930
            slice_dict = {"start": 0, "stop": 1, "step": 2, "chunk": 3}
931
            _slice = slice_dict[command_str.split(".")[1]]
932
            dim = int(command_str.split(".")[0])
933
        else:
934
            dim = int(command_str)
935
            _slice = ""
936
        return dim, _slice
937
938
    def dim_str_to_int(self, dim_str):
939
        """Check the additional 1.1.dim keyword
940
941
        :param dim: A string 'dim1' specifying the dimension
942
943
        :return: dim - An integer dimension value
944
        """
945
        number = "".join(l for l in dim_str if l.isdigit())
946
        letters = "".join(l for l in dim_str if l.isalpha())
947
948
        if letters == "dim" and number.strip():
949
            dim = int(number)
950
        else:
951
            raise ValueError(
952
                "Following the second decimal place, please "
953
                "specify a dimension 1.1.dim1 or 1.preview.dim1"
954
            )
955
        return dim
956
957
    def modify_preview(self, parameters, param_name, value, dim, _slice):
958
        """ Check the entered value is valid and edit preview"""
959
        slice_list = [0, 1, 2, 3]
960
        type_check_value = pu._dumps(value)
961
        current_preview_value = pu._dumps(parameters[param_name])
962
        pu.check_valid_dimension(dim, current_preview_value)
963
        if _slice in slice_list:
964
            # Modify this dimension and slice only
965
            if param_u._preview_dimension_singular(type_check_value):
966
                value = self._modify_preview_dimension_slice(
967
                    value, current_preview_value, dim, _slice
968
                )
969
            else:
970
                raise ValueError(
971
                    "Invalid preview dimension slice value. Please "
972
                    "enter a float, an integer or a string including "
973
                    "only start, mid and end keywords."
974
                )
975
        else:
976
            # If the entered value is a valid dimension value
977
            if param_u._preview_dimension(type_check_value):
978
                # Modify the whole dimension
979
                value = self._modify_preview_dimension(
980
                    value, current_preview_value, dim
981
                )
982
            else:
983
                raise ValueError(
984
                    "Invalid preview dimension value. Please "
985
                    "enter a float, an integer or slice notation."
986
                )
987
        return value
988
989
    def _modify_preview_dimension_slice(self, value, current_val, dim, _slice):
990
        """Modify the preview dimension slice value at the dimension (dim)
991
        provided
992
993
        :param value: The new value
994
        :param current_value: The current preview parameter value
995
        :param dim: The dimension to modify
996
        :param _slice: The slice value to modify
997
        :return: The modified value
998
        """
999
        if not current_val:
1000
            current_val = self._set_empty_list(
1001
                dim, self._set_empty_dimension_slice(value, _slice)
1002
            )
1003
        else:
1004
            current_val[dim - 1] = self._modified_slice_notation(
1005
                current_val[dim - 1], value, _slice
1006
            )
1007
        return current_val
1008
1009
    def _modified_slice_notation(self, old_value, value, _slice):
1010
        """Change the current value at the provided slice
1011
1012
        :param old_value: Previous slice notation
1013
        :param value: New value to set
1014
        :param _slice: Slice to modify
1015
        :return: Changed value (str/int)
1016
        """
1017
        old_value = self._set_incomplete_slices(str(old_value), _slice)
1018
        if pu.is_slice_notation(old_value):
1019
            start_stop_split = old_value.split(":")
1020
            return self._get_modified_slice(start_stop_split, value, _slice)
1021
        elif _slice == 0:
1022
            # If there is no slice notation, only allow first slice to
1023
            # be modified
1024
            return value
1025
        else:
1026
            raise Exception(
1027
                "There is no existing slice notation to modify."
1028
            )
1029
1030
    def _get_modified_slice(self, start_stop_split, value, _slice):
1031
        if all(v == "" for v in start_stop_split):
1032
            return self._set_empty_dimension_slice(value, _slice)
1033
        else:
1034
            start_stop_split[_slice] = str(value)
1035
            start_stop_split = ":".join(start_stop_split)
1036
            return start_stop_split
1037
1038
    def _modify_preview_dimension(self, value, current_preview, dim):
1039
        """Modify the preview list value at the dimension provided (dim)
1040
1041
        :param value: The new value
1042
        :param current_value: The current preview parameter list value
1043
        :param dim: The dimension to modify
1044
        :return: The modified value
1045
        """
1046
        if not current_preview:
1047
            return self._set_empty_list(dim, value)
1048
        current_preview[dim - 1] = value
1049
        # Save the altered preview value
1050
        return current_preview
1051
1052
    def _set_empty_dimension_slice(self, value, _slice):
1053
        """Set the empty dimension and insert colons to indicate
1054
        the correct slice notation.
1055
1056
        :param value: New value to set
1057
        :param _slice: start/stop/step/chunk value
1058
        :return: String for the new value
1059
        """
1060
        return _slice * ":" + str(value)
1061
1062
    def _set_incomplete_slices(self, old_value, _slice):
1063
        """Append default slice values.to the current string value, in order
1064
         to allow later slice values to be set
1065
1066
        :param old_value: Current string value with slice notation
1067
        :param _slice: slice notation index to be edited
1068
        :return: String with default slice notation entries set
1069
        """
1070
        while old_value.count(":") < _slice:
1071
            old_value += ":"
1072
        return old_value
1073
1074
    def _set_empty_list(self, dim, value):
1075
        """If the dimension is 1 then allow the whole empty list
1076
        to be set.
1077
1078
        :param dim: Dimension to be altered
1079
        :param value: New value
1080
        :return: List containing new value
1081
        """
1082
        if dim == 1:
1083
            return [value]
1084
        else:
1085
            raise ValueError("You have not set earlier dimensions")
1086
1087
    def level(self, level):
1088
        """ Set the visibility level of parameters """
1089
        if level:
1090
            self.disp_level = level
1091
            print(f"Level set to '{level}'")
1092
        else:
1093
            print(f"Level is set at '{self.disp_level}'")
1094
1095
    def remove(self, str_pos):
1096
        pos = self.find_position(str_pos)
1097
        entry = self.split_pos_to_list(str_pos)
1098
        self.plugin_list.plugin_list.pop(pos)
1099
        pos_list = self.get_split_positions()
1100
        self.inc_positions(pos, pos_list, entry, -1)
1101
1102
    def check_iterative_loops(self, positions, direction):
1103
        """
1104
        When a plugin is added, removed, or moved, check if any iterative loops
1105
        should be removed or shifted
1106
        """
1107
        def moved_plugin(old_pos, new_pos):
1108
            is_in_loop = self.plugin_list.check_pos_in_iterative_loop(old_pos)
1109
            if is_in_loop and old_pos != new_pos:
1110
                self.plugin_list.remove_associated_iterate_group_dict(
1111
                    old_pos, -1)
1112
1113
            if_will_be_in_loop = \
1114
                self.plugin_list.check_pos_in_iterative_loop(new_pos)
1115
            if if_will_be_in_loop and old_pos != new_pos:
1116
                self.plugin_list.remove_associated_iterate_group_dict(
1117
                    new_pos, -1)
1118
1119
            # shift any relevant loops
1120
            if new_pos < old_pos:
1121
                self.plugin_list.shift_range_iterative_loops(
1122
                    [new_pos, old_pos], 1)
1123
            elif new_pos > old_pos:
1124
                self.plugin_list.shift_range_iterative_loops(
1125
                    [old_pos, new_pos], -1)
1126
1127
        def added_removed_plugin(pos, direction):
1128
            is_in_loop = self.plugin_list.check_pos_in_iterative_loop(pos)
1129
            if is_in_loop:
1130
                # delete the associated loop
1131
                self.plugin_list.remove_associated_iterate_group_dict(pos,
1132
                    direction)
1133
1134
            # check if there are any iterative loops in the process list
1135
            do_loops_exist = len(self.plugin_list.iterate_plugin_groups) > 0
1136
            if do_loops_exist:
1137
                if direction == -1:
1138
                    # shift the start+end of all loops after the plugin down by
1139
                    # 1
1140
                    self.plugin_list.shift_subsequent_iterative_loops(pos, -1)
1141
                elif direction == 1:
1142
                    # shift the start+end of all loops after the plugin up by 1
1143
                    self.plugin_list.shift_subsequent_iterative_loops(pos, 1)
1144
1145
        if direction == 0:
1146
            # a plugin has been moved
1147
            moved_plugin(positions[0], positions[1])
1148
        else:
1149
            # a plugin has been added or removed
1150
            added_removed_plugin(positions[0], direction)
1151
1152
    @property
1153
    def size(self):
1154
        return len(self.plugin_list.plugin_list)
1155
1156
    def display_iterative_loops(self):
1157
        self.plugin_list.print_iterative_loops()