Test Failed
Pull Request — master (#888)
by Yousef
04:12
created

savu.core.iterative_plugin_runner   B

Complexity

Total Complexity 51

Size/Duplication

Total Lines 480
Duplicated Lines 17.92 %

Importance

Changes 0
Metric Value
eloc 196
dl 86
loc 480
rs 7.92
c 0
b 0
f 0
wmc 51

19 Methods

Rating   Name   Duplication   Size   Complexity  
A IteratePluginGroup.setup_datasets() 0 14 1
A IteratePluginGroup.__init__() 0 50 1
A IteratePluginGroup.set_iteration_datasets() 0 2 1
B IteratePluginGroup._finalise_iterated_datasets() 52 52 6
A IteratePluginGroup.set_alternating_plugin_datasets() 0 19 2
A IteratePluginGroup._execute_iterations() 0 62 5
B IteratePluginGroup.__set_datasets() 0 56 7
B IteratePluginGroup.set_plugin_datasets() 34 55 6
B IteratePluginGroup._execute_iteration_0() 0 73 7
A IteratePluginGroup.set_alternating_datasets() 0 6 3
A IteratePluginGroup.set_start_plugin() 0 5 1
A IteratePluginGroup.add_plugin_to_iterate_group() 0 6 1
A IteratePluginGroup.increment_ip_iteration() 0 2 1
A IteratePluginGroup._reset_input_dataset_slicing() 0 9 1
A IteratePluginGroup.get_plugin_datasets() 0 6 1
A IteratePluginGroup.__set_original_datasets() 0 7 1
A IteratePluginGroup.get_original_datasets() 0 5 1
A IteratePluginGroup.create_clone() 0 5 1
A IteratePluginGroup.set_end_plugin() 0 5 1

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like savu.core.iterative_plugin_runner often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
from savu.core.iterate_plugin_group_utils import shift_plugin_index
2
3
4
class IteratePluginGroup():
5
    '''
6
    Class for iterating a set/group of plugins in a process list
7
    '''
8
9
    def __init__(self, plugin_runner, start_index, end_index, iterations):
10
        self.in_data = None
11
        self.out_data = None
12
        # PluginRunner object for running the individual plugns in the group of
13
        # pluigns to iterate over
14
        self.plugin_runner = plugin_runner
15
16
        # nPlugin index of plugin that is at the start of group to iterate over
17
        self.start_index = start_index
18
        # nPlugin index of plugin that is at the end of group to iterate over
19
        self.end_index = end_index
20
21
        # dict of plugins needed to run the plugins using
22
        # PluginRunner.__run_plugin()
23
        self.plugins = []
24
25
        # Note: copied from CpuIterativePlugin
26
27
        # the current iteration: starts counting at 0 (zero-based)
28
        self._ip_iteration = 0
29
        # the number of iterations to perform: starts counting at 1 (one-based)
30
        self._ip_fixed_iterations = iterations
31
        # The _ip_data_dict value eventually holds 3 keys:
32
        # - 'iterating'
33
        # - 0
34
        # - 1
35
        # The name of the 0 key refers to the 0th iteration, and the name of the
36
        # 1 key refers to the 1st iteration
37
        # The values of the 0 key is a list containing two lists (both with only
38
        # one element in them):
39
        # - a list contining the input NeXuS file
40
        # - a list containing the Data object used as an input AND output dataset
41
        # (depending on the iteration number) with the "original" backing file
42
        # (ie, NOT the "cloned" backing file)
43
        # The value of the 1 key is a list containing two lists (one containing
44
        # one element, one containing two elements):
45
        # - a list containing the input NeXuS file, and also the Data object
46
        #   with the "original" backing file
47
        # - a list containing the Data object with the "clone" backing file
48
        self._ip_data_dict = {}
49
        # The dict value of the 'iterating' key contains only one key-value
50
        # pair throughout all iterations:
51
        # - the key is the "original" Data object
52
        # - the value is the "cloned" Data object
53
        self._ip_data_dict['iterating'] = {}
54
55
        # dict for holding the different PluginData objects involved
56
        self._ip_plugin_data_dict = {
57
            'original': {},
58
            'clone': {}
59
        }
60
61
    def setup_datasets(self):
62
        '''
63
        Setup the cloned datasets in the start and end plugins in the group to
64
        iterate over
65
        '''
66
        self.__set_original_datasets()
67
        # get the in and out datasets, like in IterativeCcpiDenosing.setup()
68
        in_dataset, out_dataset = self.get_original_datasets()
69
70
        # set the input and output datasets for the first iteration
71
        self.set_iteration_datasets(0, [in_dataset[0]], [out_dataset[0]])
72
        # set the input and output datasets for subsequent iterations
73
        self.set_iteration_datasets(1, [in_dataset[0], out_dataset[0]],
74
                                    [out_dataset[1]])
75
        # out_dataset[0] and out_dataset[1] will continue to alternate for
76
        # all remaining iterations i.e. output becomes input and input becomes
77
        # output.
78
79
    def _execute_iteration_0(self, exp, transport):
80
        '''
81
        Run plugins for iteration 0
82
        '''
83
        start = shift_plugin_index(exp, self.start_index)
84
        end = shift_plugin_index(exp, self.end_index)
85
86
        nPlugin = exp.meta_data.get('nPlugin')
87
        exp_coll = exp._get_collection()
88
        if start == end and nPlugin == end:
89
            # start == end -> group of plugins to iterate over is a single
90
            # plugin
91
92
            plugin_name = \
93
                self.plugin_runner._PluginRunner__run_plugin(
94
                    exp_coll['plugin_dict'][nPlugin],
95
                    clean_up_plugin=False)
96
97
            # since the end plugin has now been run, the group of plugins to
98
            # iterate over has been executed once, and this counts as having
99
            # done one iteration (ie, at this point, iteration 0 is
100
            # complete)
101
            self.increment_ip_iteration()
102
            # kick off all subsequent iterations
103
            self._execute_iterations(exp, transport)
104
            # finished all iterations, set which output dataset to keep, and
105
            # which to remove
106
            self._finalise_iterated_datasets(exp)
107
        else:
108
            # start != end -> group of plugins to iterate over is more than one
109
            # plugin
110
            if nPlugin == start:
111
                # start plugin is being run, on iteration 0
112
                print(f"Iteration {self._ip_iteration}")
113
                plugin = self.plugin_runner._PluginRunner__run_plugin(
114
                    exp_coll['plugin_dict'][nPlugin],
115
                    clean_up_plugin=False)
116
                plugin_name = plugin.name
117
                self.set_start_plugin(plugin)
118
            elif nPlugin == end:
119
                # end plugin is being run, on iteration 0
120
121
                plugin_name = \
122
                    self.plugin_runner._PluginRunner__run_plugin(
123
                        exp_coll['plugin_dict'][nPlugin],
124
                        clean_up_plugin=False)
125
126
                # since the end plugin has now been run, the group of plugins to
127
                # iterate over has been executed once, and this counts as having
128
                # done one iteration (ie, at this point, iteration 0 is
129
                # complete)
130
                self.increment_ip_iteration()
131
                # kick off all subsequent iterations
132
                self._execute_iterations(exp, transport)
133
                # finished all iterations, set which output dataset to keep, and
134
                # which to remove
135
                self._finalise_iterated_datasets(exp)
136
            elif nPlugin >= start and nPlugin <= end:
137
                # a "middle" plugin is being run on iteration 0
138
                plugin = self.plugin_runner._PluginRunner__run_plugin(
139
                    exp_coll['plugin_dict'][nPlugin],
140
                    clean_up_plugin=False)
141
                plugin_name = plugin.name
142
            else:
143
                info_dict = {
144
                    'start_index': self.start_index,
145
                    'end_index': self.end_index
146
                }
147
                err_str = f"Encountered an unknown case when running inside " \
148
                    f"an iterative loop. IteratePluginGroup info: {info_dict}"
149
                raise Exception(err_str)
150
151
        return plugin_name
152
153
    def _execute_iterations(self, exp, transport):
154
        '''
155
        Execute all iterations from iteration 1 onwards (iteration 0 is
156
        currently handled by methods in PluginRunner).
157
        '''
158
        # The docstring of this method in IterativePlugin is the following:
159
        #
160
        # Run the pre_process, process, and post_process methods.
161
        #
162
        # However, there is no obvious point where those methods are called,
163
        # so perhaps this docstring isn't quite accurate? (Also note that this
164
        # sentence has been copied from the docstring
165
        # BaseDriver._run_plugin_instances(), so maybe it is just a generic
166
        # description of what this method SHOULD do, but doesn't yet do,
167
        # in IterativePlugin)
168
169
        while self._ip_iteration < self._ip_fixed_iterations:
170
            print(f"Iteration {self._ip_iteration}...")
171
            self.__set_datasets()
172
            # replace this with the PluginRunner.__run_plugin() method to run
173
            # the individual plugins in the group of plugins to iterate
174
            #self._run_plugin_instances(transport, self.get_communicator())
175
176
            # clean up the plugins in the group to iterate over IF the last
177
            # iteration is being executed
178
            if self._ip_iteration == self._ip_fixed_iterations - 1:
179
                clean_up_plugin = True
180
            else:
181
                clean_up_plugin = False
182
183
            start = shift_plugin_index(exp, self.start_index)
184
            # naughty naughty, to run a double underscore method, but for now,
185
            # just testing...
186
            for plugin in self.plugins:
187
                # reset the slicing of the input data for the plugin, to be what
188
                # it was on the previous iteration before the plugin was run, as
189
                # opposed to what it potentially changed to after processing
190
                # occurred in the last iteration
191
                #
192
                # only reset the input PluginData for the plugin if it's not the
193
                # start plugin of an iterative loop, since this is already done
194
                # by default for the start plugin on every iteration
195
                nPlugin = exp.meta_data.get('nPlugin')
196
                if nPlugin != start:
197
                    self._reset_input_dataset_slicing(plugin)
198
                print(f"Running {plugin.name} in iterative group of plugins")
199
                # TODO: need to pass the plguin dict, or something more than an
200
                # empty dict...
201
                self.plugin_runner._PluginRunner__run_plugin({},
202
                    clean_up_plugin=clean_up_plugin,
203
                    plugin=plugin)
204
205
            # if self._ip_fixed_iterations has been set to something other
206
            # than its original value of False, and if the current iteration
207
            # (the one that has just been completed) is the LAST iteration,
208
            # then processing has been completed
209
            #
210
            # Note that _ip_iteration starts counting at 0,
211
            # but _ip_fixed_iterations starts counting at 1, so if you have
212
            # reached _ip_iteration=n, then this means that n+1 iterations
213
            # have been performed
214
            self.increment_ip_iteration()
215
216
    def _reset_input_dataset_slicing(self, plugin):
217
        """
218
        Reset the slicing of the input dataset of a plugin in an iterative loop,
219
        to what it was on the previous iteration
220
        """
221
        previous_in_pData = plugin.parameters['plugin_in_datasets'][0]
222
        plugin.parameters['in_datasets'][0]._set_plugin_data(previous_in_pData)
223
        plugin._finalise_plugin_datasets()
224
        plugin._finalise_datasets()
225
226
    def increment_ip_iteration(self):
227
        self._ip_iteration += 1
228
229
    def __set_original_datasets(self):
230
        '''
231
        Utility function to make the (original) in dataset, and out dataset,
232
        easier to reference
233
        '''
234
        self.in_data = self.start_plugin.parameters['in_datasets']
235
        self.out_data = self.end_plugin.parameters['out_datasets']
236
237
    def get_original_datasets(self):
238
        '''
239
        Helper function to get the in and out datasets more easily.
240
        '''
241
        return self.in_data, self.out_data
242
243
    def get_plugin_datasets(self):
244
        '''
245
        Helper function to get the in and out plugin datasets more easily.
246
        '''
247
        return self.start_plugin.parameters['plugin_in_datasets'], \
248
            self.end_plugin.parameters['plugin_out_datasets']
249
250
    def create_clone(self, clone, data):
251
        clone.create_dataset(data)
252
        clone.data_info.set('clone', data.get_name())
253
        # alternate a dataset with its clone
254
        self.set_alternating_datasets(data, clone)
255
256
    def set_alternating_datasets(self, d1, d2):
257
        names = [d1.get_name(), d2.get_name()]
258
        if not any([True if 'itr_clone' in i else False for i in names]):
259
            raise Exception('Alternating datasets must contain a clone.  These'
260
                            ' are found at the end of the out_datasets list')
261
        self._ip_data_dict['iterating'][d1] = d2
262
263
    def set_iteration_datasets(self, itr, in_data, out_data):
264
        self._ip_data_dict[itr] = [in_data, out_data]
265
266
    def set_start_plugin(self, plugin):
267
        '''
268
        Set the plugin that is at the start of the group to iterate over
269
        '''
270
        self.start_plugin = plugin
271
272
    def set_end_plugin(self, plugin):
273
        '''
274
        Set the plugin that is at the end of the group to iterate over
275
        '''
276
        self.end_plugin = plugin
277
278
    def add_plugin_to_iterate_group(self, plugin):
279
        '''
280
        Append plugin dict to list fo plguins that are part of the group to
281
        iterate over
282
        '''
283
        self.plugins.append(plugin)
284
285
    def __set_datasets(self):
286
        '''
287
        Set the input and output datasets such that
288
        - the output dataset from the previous iteration is the input dataset of
289
          the current iteration that is about to be performed
290
        - the input dataset from the previous iteration is used to write the
291
          output of the current iteration that is about to be performed
292
        '''
293
        # TODO: perhaps the pattern should be changed here, to make use of
294
        #  the same logic that is being used to switch the original & cloned
295
        #  data?
296
297
        # Only the 0th and 1st iterations are set in _ip_data_dicts, there is
298
        # NOT a key for every iteration in _ip_data_dict, hence this if/elif
299
        # block
300
        if self._ip_iteration in list(self._ip_data_dict.keys()):
301
            # If on the 0th or 1st iteration, set the in_datasets and
302
            # out_datasets according to the structure  defined in _ip_data_dict
303
            #
304
            # The body of this if statement is essentially a way to "set up" the
305
            # input and ouput datasets so that for iterations after the 0th and
306
            # 1st, the two datasets that are swapped between being used for
307
            # input or output (depending on the particular iteration) can be
308
            # swapped WITHOUT having to define a key-value pair in
309
            # _ip_data_dict for EVERY SINGLE ITERATION
310
            self.start_plugin.parameters['in_datasets'] = [self._ip_data_dict[self._ip_iteration][0][-1]]
311
            self.end_plugin.parameters['out_datasets'] = self._ip_data_dict[self._ip_iteration][1]
312
        elif self._ip_iteration > 0:
313
            # If on an iteration greater than 1 (since the if statement catches
314
            # both iteration 0 and 1), then there is some (fiddly...) logic
315
            # here to essentially SWAP the out dataset from the previous
316
            # iteration with the in dataset of the previous iteration
317
            #
318
            # Practically speaking, this means that:
319
            # - the out dataset from the previous iteration is used as the input
320
            #   for the current iteration that is about to be performed
321
            # - the in dataset from the previous iteration is free to be used to
322
            #   write the output of the current iteration that is about to be
323
            #   performed
324
            p = [
325
                self.start_plugin.parameters['in_datasets'],
326
                self.end_plugin.parameters['out_datasets']
327
            ]
328
329
            for s1, s2 in self._ip_data_dict['iterating'].items():
330
                a = [0, p[0].index(s1)] if s1 in p[0] else [1, p[1].index(s1)]
331
                b = [0, p[0].index(s2)] if s2 in p[0] else [1, p[1].index(s2)]
332
                p[a[0]][a[1]], p[b[0]][b[1]] = p[b[0]][b[1]], p[a[0]][a[1]]
333
334
        if self.start_index != self.end_index:
335
            self.set_plugin_datasets()
336
        else:
337
            info_str = f"Not setting plugin datasets for " \
338
                       f"{self.start_plugin.name}, since iterating only a "\
339
                       f"single plugin"
340
            print(info_str)
341
342
343
    def set_plugin_datasets(self):
344
        """
345
        Set the PluginData objects for the original and cloned Data objects,
346
        based on the current iteration.
347
        """
348
        p = [
349
            self.start_plugin.parameters['in_datasets'],
350
            self.end_plugin.parameters['out_datasets']
351
        ]
352
353
        for s1, s2 in self._ip_data_dict['iterating'].items():
354
            # change the PluginData obejcts for the in and out datasets, to take
355
            # care of the potential switching of patterns
356 View Code Duplication
            if s1 in p[0]:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
357
                new_original_data_pData = \
358
                    self._ip_plugin_data_dict['original']['start_plugin']
359
                self.start_plugin.parameters['plugin_in_datasets'] = \
360
                    [new_original_data_pData]
361
                p[0][0]._set_plugin_data(new_original_data_pData)
362
            elif s1 in p[1]:
363
                new_original_data_pData = \
364
                    self._ip_plugin_data_dict['original']['end_plugin']
365
                self.end_plugin.parameters['plugin_out_datasets'] = \
366
                    [new_original_data_pData]
367
                p[1][0]._set_plugin_data(new_original_data_pData)
368
            else:
369
                info_str = f"s1 {s1.backing_file} wasn't in either the start " \
370
                           f"plugin's plugin_in_datasets, nor the end " \
371
                           f"plugin's plugin_out_datasets"
372
                print(info_str)
373
374 View Code Duplication
            if s2 in p[0]:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
375
                new_cloned_data_pData = \
376
                    self._ip_plugin_data_dict['clone']['start_plugin']
377
                self.start_plugin.parameters['plugin_in_datasets'] = \
378
                    [new_cloned_data_pData]
379
                p[0][0]._set_plugin_data(new_cloned_data_pData)
380
            elif s2 in p[1]:
381
                new_cloned_data_pData = \
382
                    self._ip_plugin_data_dict['clone']['end_plugin']
383
                self.end_plugin.parameters['plugin_out_datasets'] = \
384
                    [new_cloned_data_pData]
385
                p[1][0]._set_plugin_data(new_cloned_data_pData)
386
            else:
387
                info_str = f"s2 {s2.backing_file} wasn't in either the start " \
388
                           f"plugin's plugin_in_datasets, nor the end " \
389
                           f"plugin's plugin_out_datasets"
390
                print(info_str)
391
392
        # reset the values inside Data.data_info that have an effect on how the
393
        # value of core_slice in SliceLists comes out
394
        self.start_plugin._finalise_datasets()
395
        self.start_plugin._finalise_plugin_datasets()
396
        self.end_plugin._finalise_datasets()
397
        self.end_plugin._finalise_plugin_datasets()
398
399 View Code Duplication
    def _finalise_iterated_datasets(self, exp):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
400
        '''
401
        Inspect the two Data objects that are used to contain the input and
402
        output data for iterations over the course of the iterative processing
403
        (input/output depending on which particular iteration was being done).
404
405
        Mark one of them as the "final dataset" to be added to the output
406
        NeXuS file, and mark the other as "obsolete/to be removed".
407
408
        The decision between which one is kept and which one is removed
409
        depends on which Data object contains the OUTPUT of the very last
410
        iteration.
411
412
        For an odd number of iterations, this is the "original" Data object.
413
        For an even number of iteration, this is the "clone" Data object.
414
        '''
415
        for s1, s2 in self._ip_data_dict['iterating'].items():
416
            name = s1.get_name()
417
            name = name if 'itr_clone' not in name else s2.get_name()
418
            final_dataset = s1 if s1 in self.end_plugin.parameters['out_datasets'] else s2
419
            obsolete = s1 if s1 is not final_dataset else s2
420
            obsolete.remove = True
421
422
            # switch names if necessary
423
            if final_dataset.get_name() != name:
424
                # If this is true, then the output dataset of the last
425
                # iteration is the clone Data object (hence, the mismatched
426
                # names).
427
                #
428
                # So then:
429
                # - obsolete = original
430
                # - final_dataset = clone
431
                #
432
                # which means that the CLONED dataset needs to be set in the
433
                # Experiment object (self.exp) as the "out data", but under
434
                # the name of the ORIGINAL dataset.
435
                # And also, the ORIGINAL dataset is set in the Experiment
436
                # object, but under the name of the CLONED/OBSOLETE dataset
437
                temp = obsolete
438
                exp.index['out_data'][name] = final_dataset
439
                exp.index['out_data'][s2.get_name()] = temp
440
                # One last thing to do in this case is to set the "name"
441
                # inside the Data object that final_result is set to.
442
                #
443
                # This is because, in this case, the CLONED dataset is in
444
                # final_result, and the "name" within the Data object will
445
                # be some value like "itr_0".
446
                #
447
                # However, the name within the Data object needs to be the
448
                # name of the ORIGINAL Data object in order for the creation
449
                # of the output NeXuS file to work.
450
                final_dataset._set_name(name)
451
452
    def set_alternating_datasets(self):
453
        d1 = self.end_plugin.parameters['out_datasets'][0]
454
        d2 = self.end_plugin.parameters['out_datasets'][1]
455
        names = [d1.get_name(), d2.get_name()]
456
        if not any([True if 'itr_clone' in i else False for i in names]):
457
            raise Exception('Alternating datasets must contain a clone.  These'
458
                            ' are found at the end of the out_datasets list')
459
        self._ip_data_dict['iterating'][d1] = d2
460
461
    def set_alternating_plugin_datasets(self):
462
        """
463
        Setup the PluginData objects for the original and cloned Data objects
464
        """
465
        if self.start_index != self.end_index:
466
            self._ip_plugin_data_dict['original']['start_plugin'] = \
467
                self.start_plugin.parameters['plugin_in_datasets'][1]
468
            self._ip_plugin_data_dict['clone']['start_plugin'] = \
469
                self.start_plugin.parameters['plugin_in_datasets'][2]
470
471
            self._ip_plugin_data_dict['original']['end_plugin'] = \
472
                self.end_plugin.parameters['plugin_out_datasets'][0]
473
            self._ip_plugin_data_dict['clone']['end_plugin'] = \
474
                self.end_plugin.parameters['plugin_out_datasets'][1]
475
        else:
476
            info_str = f"Not setting up alternating plugin datasets for " \
477
                       f"{self.start_plugin.name}, since iterating only a "\
478
                       f"single plugin"
479
            print(info_str)