Test Failed
Push — master ( 9ba79c...17f3e3 )
by Yousef
01:54 queued 19s
created

savu.core.iterative_plugin_runner   B

Complexity

Total Complexity 51

Size/Duplication

Total Lines 482
Duplicated Lines 17.84 %

Importance

Changes 0
Metric Value
eloc 197
dl 86
loc 482
rs 7.92
c 0
b 0
f 0
wmc 51

19 Methods

Rating   Name   Duplication   Size   Complexity  
A IteratePluginGroup.set_iteration_datasets() 0 2 1
B IteratePluginGroup._finalise_iterated_datasets() 52 52 6
A IteratePluginGroup.set_alternating_plugin_datasets() 0 19 2
A IteratePluginGroup._execute_iterations() 0 62 5
B IteratePluginGroup.__set_datasets() 0 56 7
A IteratePluginGroup.setup_datasets() 0 14 1
B IteratePluginGroup.set_plugin_datasets() 34 55 6
B IteratePluginGroup._execute_iteration_0() 0 73 7
A IteratePluginGroup.set_alternating_datasets() 0 6 3
A IteratePluginGroup.__init__() 0 50 1
A IteratePluginGroup.set_start_plugin() 0 5 1
A IteratePluginGroup.add_plugin_to_iterate_group() 0 6 1
A IteratePluginGroup.increment_ip_iteration() 0 2 1
A IteratePluginGroup._reset_input_dataset_slicing() 0 9 1
A IteratePluginGroup.get_plugin_datasets() 0 6 1
A IteratePluginGroup.__set_original_datasets() 0 7 1
A IteratePluginGroup.get_original_datasets() 0 5 1
A IteratePluginGroup.create_clone() 0 5 1
A IteratePluginGroup.set_end_plugin() 0 5 1

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like savu.core.iterative_plugin_runner often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
import logging
2
3
from savu.core.iterate_plugin_group_utils import shift_plugin_index
4
5
6
class IteratePluginGroup():
7
    '''
8
    Class for iterating a set/group of plugins in a process list
9
    '''
10
11
    def __init__(self, plugin_runner, start_index, end_index, iterations):
12
        self.in_data = None
13
        self.out_data = None
14
        # PluginRunner object for running the individual plugns in the group of
15
        # pluigns to iterate over
16
        self.plugin_runner = plugin_runner
17
18
        # nPlugin index of plugin that is at the start of group to iterate over
19
        self.start_index = start_index
20
        # nPlugin index of plugin that is at the end of group to iterate over
21
        self.end_index = end_index
22
23
        # dict of plugins needed to run the plugins using
24
        # PluginRunner.__run_plugin()
25
        self.plugins = []
26
27
        # Note: copied from CpuIterativePlugin
28
29
        # the current iteration: starts counting at 0 (zero-based)
30
        self._ip_iteration = 0
31
        # the number of iterations to perform: starts counting at 1 (one-based)
32
        self._ip_fixed_iterations = iterations
33
        # The _ip_data_dict value eventually holds 3 keys:
34
        # - 'iterating'
35
        # - 0
36
        # - 1
37
        # The name of the 0 key refers to the 0th iteration, and the name of the
38
        # 1 key refers to the 1st iteration
39
        # The values of the 0 key is a list containing two lists (both with only
40
        # one element in them):
41
        # - a list contining the input NeXuS file
42
        # - a list containing the Data object used as an input AND output dataset
43
        # (depending on the iteration number) with the "original" backing file
44
        # (ie, NOT the "cloned" backing file)
45
        # The value of the 1 key is a list containing two lists (one containing
46
        # one element, one containing two elements):
47
        # - a list containing the input NeXuS file, and also the Data object
48
        #   with the "original" backing file
49
        # - a list containing the Data object with the "clone" backing file
50
        self._ip_data_dict = {}
51
        # The dict value of the 'iterating' key contains only one key-value
52
        # pair throughout all iterations:
53
        # - the key is the "original" Data object
54
        # - the value is the "cloned" Data object
55
        self._ip_data_dict['iterating'] = {}
56
57
        # dict for holding the different PluginData objects involved
58
        self._ip_plugin_data_dict = {
59
            'original': {},
60
            'clone': {}
61
        }
62
63
    def setup_datasets(self):
64
        '''
65
        Setup the cloned datasets in the start and end plugins in the group to
66
        iterate over
67
        '''
68
        self.__set_original_datasets()
69
        # get the in and out datasets, like in IterativeCcpiDenosing.setup()
70
        in_dataset, out_dataset = self.get_original_datasets()
71
72
        # set the input and output datasets for the first iteration
73
        self.set_iteration_datasets(0, [in_dataset[0]], [out_dataset[0]])
74
        # set the input and output datasets for subsequent iterations
75
        self.set_iteration_datasets(1, [in_dataset[0], out_dataset[0]],
76
                                    [out_dataset[1]])
77
        # out_dataset[0] and out_dataset[1] will continue to alternate for
78
        # all remaining iterations i.e. output becomes input and input becomes
79
        # output.
80
81
    def _execute_iteration_0(self, exp, transport):
82
        '''
83
        Run plugins for iteration 0
84
        '''
85
        start = shift_plugin_index(exp, self.start_index)
86
        end = shift_plugin_index(exp, self.end_index)
87
88
        nPlugin = exp.meta_data.get('nPlugin')
89
        exp_coll = exp._get_collection()
90
        if start == end and nPlugin == end:
91
            # start == end -> group of plugins to iterate over is a single
92
            # plugin
93
94
            plugin_name = \
95
                self.plugin_runner._PluginRunner__run_plugin(
96
                    exp_coll['plugin_dict'][nPlugin],
97
                    clean_up_plugin=False)
98
99
            # since the end plugin has now been run, the group of plugins to
100
            # iterate over has been executed once, and this counts as having
101
            # done one iteration (ie, at this point, iteration 0 is
102
            # complete)
103
            self.increment_ip_iteration()
104
            # kick off all subsequent iterations
105
            self._execute_iterations(exp, transport)
106
            # finished all iterations, set which output dataset to keep, and
107
            # which to remove
108
            self._finalise_iterated_datasets(exp)
109
        else:
110
            # start != end -> group of plugins to iterate over is more than one
111
            # plugin
112
            if nPlugin == start:
113
                # start plugin is being run, on iteration 0
114
                print(f"Iteration {self._ip_iteration}...")
115
                plugin = self.plugin_runner._PluginRunner__run_plugin(
116
                    exp_coll['plugin_dict'][nPlugin],
117
                    clean_up_plugin=False)
118
                plugin_name = plugin.name
119
                self.set_start_plugin(plugin)
120
            elif nPlugin == end:
121
                # end plugin is being run, on iteration 0
122
123
                plugin_name = \
124
                    self.plugin_runner._PluginRunner__run_plugin(
125
                        exp_coll['plugin_dict'][nPlugin],
126
                        clean_up_plugin=False)
127
128
                # since the end plugin has now been run, the group of plugins to
129
                # iterate over has been executed once, and this counts as having
130
                # done one iteration (ie, at this point, iteration 0 is
131
                # complete)
132
                self.increment_ip_iteration()
133
                # kick off all subsequent iterations
134
                self._execute_iterations(exp, transport)
135
                # finished all iterations, set which output dataset to keep, and
136
                # which to remove
137
                self._finalise_iterated_datasets(exp)
138
            elif nPlugin >= start and nPlugin <= end:
139
                # a "middle" plugin is being run on iteration 0
140
                plugin = self.plugin_runner._PluginRunner__run_plugin(
141
                    exp_coll['plugin_dict'][nPlugin],
142
                    clean_up_plugin=False)
143
                plugin_name = plugin.name
144
            else:
145
                info_dict = {
146
                    'start_index': self.start_index,
147
                    'end_index': self.end_index
148
                }
149
                err_str = f"Encountered an unknown case when running inside " \
150
                    f"an iterative loop. IteratePluginGroup info: {info_dict}"
151
                raise Exception(err_str)
152
153
        return plugin_name
154
155
    def _execute_iterations(self, exp, transport):
156
        '''
157
        Execute all iterations from iteration 1 onwards (iteration 0 is
158
        currently handled by methods in PluginRunner).
159
        '''
160
        # The docstring of this method in IterativePlugin is the following:
161
        #
162
        # Run the pre_process, process, and post_process methods.
163
        #
164
        # However, there is no obvious point where those methods are called,
165
        # so perhaps this docstring isn't quite accurate? (Also note that this
166
        # sentence has been copied from the docstring
167
        # BaseDriver._run_plugin_instances(), so maybe it is just a generic
168
        # description of what this method SHOULD do, but doesn't yet do,
169
        # in IterativePlugin)
170
171
        while self._ip_iteration < self._ip_fixed_iterations:
172
            print(f"Iteration {self._ip_iteration}...")
173
            self.__set_datasets()
174
            # replace this with the PluginRunner.__run_plugin() method to run
175
            # the individual plugins in the group of plugins to iterate
176
            #self._run_plugin_instances(transport, self.get_communicator())
177
178
            # clean up the plugins in the group to iterate over IF the last
179
            # iteration is being executed
180
            if self._ip_iteration == self._ip_fixed_iterations - 1:
181
                clean_up_plugin = True
182
            else:
183
                clean_up_plugin = False
184
185
            start = shift_plugin_index(exp, self.start_index)
186
            # naughty naughty, to run a double underscore method, but for now,
187
            # just testing...
188
            for plugin in self.plugins:
189
                # reset the slicing of the input data for the plugin, to be what
190
                # it was on the previous iteration before the plugin was run, as
191
                # opposed to what it potentially changed to after processing
192
                # occurred in the last iteration
193
                #
194
                # only reset the input PluginData for the plugin if it's not the
195
                # start plugin of an iterative loop, since this is already done
196
                # by default for the start plugin on every iteration
197
                nPlugin = exp.meta_data.get('nPlugin')
198
                if nPlugin != start:
199
                    self._reset_input_dataset_slicing(plugin)
200
                print(f"Running {plugin.name} in iterative group of plugins")
201
                # TODO: need to pass the plguin dict, or something more than an
202
                # empty dict...
203
                self.plugin_runner._PluginRunner__run_plugin({},
204
                    clean_up_plugin=clean_up_plugin,
205
                    plugin=plugin)
206
207
            # if self._ip_fixed_iterations has been set to something other
208
            # than its original value of False, and if the current iteration
209
            # (the one that has just been completed) is the LAST iteration,
210
            # then processing has been completed
211
            #
212
            # Note that _ip_iteration starts counting at 0,
213
            # but _ip_fixed_iterations starts counting at 1, so if you have
214
            # reached _ip_iteration=n, then this means that n+1 iterations
215
            # have been performed
216
            self.increment_ip_iteration()
217
218
    def _reset_input_dataset_slicing(self, plugin):
219
        """
220
        Reset the slicing of the input dataset of a plugin in an iterative loop,
221
        to what it was on the previous iteration
222
        """
223
        previous_in_pData = plugin.parameters['plugin_in_datasets'][0]
224
        plugin.parameters['in_datasets'][0]._set_plugin_data(previous_in_pData)
225
        plugin._finalise_plugin_datasets()
226
        plugin._finalise_datasets()
227
228
    def increment_ip_iteration(self):
229
        self._ip_iteration += 1
230
231
    def __set_original_datasets(self):
232
        '''
233
        Utility function to make the (original) in dataset, and out dataset,
234
        easier to reference
235
        '''
236
        self.in_data = self.start_plugin.parameters['in_datasets']
237
        self.out_data = self.end_plugin.parameters['out_datasets']
238
239
    def get_original_datasets(self):
240
        '''
241
        Helper function to get the in and out datasets more easily.
242
        '''
243
        return self.in_data, self.out_data
244
245
    def get_plugin_datasets(self):
246
        '''
247
        Helper function to get the in and out plugin datasets more easily.
248
        '''
249
        return self.start_plugin.parameters['plugin_in_datasets'], \
250
            self.end_plugin.parameters['plugin_out_datasets']
251
252
    def create_clone(self, clone, data):
253
        clone.create_dataset(data)
254
        clone.data_info.set('clone', data.get_name())
255
        # alternate a dataset with its clone
256
        self.set_alternating_datasets(data, clone)
257
258
    def set_alternating_datasets(self, d1, d2):
259
        names = [d1.get_name(), d2.get_name()]
260
        if not any([True if 'itr_clone' in i else False for i in names]):
261
            raise Exception('Alternating datasets must contain a clone.  These'
262
                            ' are found at the end of the out_datasets list')
263
        self._ip_data_dict['iterating'][d1] = d2
264
265
    def set_iteration_datasets(self, itr, in_data, out_data):
266
        self._ip_data_dict[itr] = [in_data, out_data]
267
268
    def set_start_plugin(self, plugin):
269
        '''
270
        Set the plugin that is at the start of the group to iterate over
271
        '''
272
        self.start_plugin = plugin
273
274
    def set_end_plugin(self, plugin):
275
        '''
276
        Set the plugin that is at the end of the group to iterate over
277
        '''
278
        self.end_plugin = plugin
279
280
    def add_plugin_to_iterate_group(self, plugin):
281
        '''
282
        Append plugin dict to list fo plguins that are part of the group to
283
        iterate over
284
        '''
285
        self.plugins.append(plugin)
286
287
    def __set_datasets(self):
288
        '''
289
        Set the input and output datasets such that
290
        - the output dataset from the previous iteration is the input dataset of
291
          the current iteration that is about to be performed
292
        - the input dataset from the previous iteration is used to write the
293
          output of the current iteration that is about to be performed
294
        '''
295
        # TODO: perhaps the pattern should be changed here, to make use of
296
        #  the same logic that is being used to switch the original & cloned
297
        #  data?
298
299
        # Only the 0th and 1st iterations are set in _ip_data_dicts, there is
300
        # NOT a key for every iteration in _ip_data_dict, hence this if/elif
301
        # block
302
        if self._ip_iteration in list(self._ip_data_dict.keys()):
303
            # If on the 0th or 1st iteration, set the in_datasets and
304
            # out_datasets according to the structure  defined in _ip_data_dict
305
            #
306
            # The body of this if statement is essentially a way to "set up" the
307
            # input and ouput datasets so that for iterations after the 0th and
308
            # 1st, the two datasets that are swapped between being used for
309
            # input or output (depending on the particular iteration) can be
310
            # swapped WITHOUT having to define a key-value pair in
311
            # _ip_data_dict for EVERY SINGLE ITERATION
312
            self.start_plugin.parameters['in_datasets'] = [self._ip_data_dict[self._ip_iteration][0][-1]]
313
            self.end_plugin.parameters['out_datasets'] = self._ip_data_dict[self._ip_iteration][1]
314
        elif self._ip_iteration > 0:
315
            # If on an iteration greater than 1 (since the if statement catches
316
            # both iteration 0 and 1), then there is some (fiddly...) logic
317
            # here to essentially SWAP the out dataset from the previous
318
            # iteration with the in dataset of the previous iteration
319
            #
320
            # Practically speaking, this means that:
321
            # - the out dataset from the previous iteration is used as the input
322
            #   for the current iteration that is about to be performed
323
            # - the in dataset from the previous iteration is free to be used to
324
            #   write the output of the current iteration that is about to be
325
            #   performed
326
            p = [
327
                self.start_plugin.parameters['in_datasets'],
328
                self.end_plugin.parameters['out_datasets']
329
            ]
330
331
            for s1, s2 in self._ip_data_dict['iterating'].items():
332
                a = [0, p[0].index(s1)] if s1 in p[0] else [1, p[1].index(s1)]
333
                b = [0, p[0].index(s2)] if s2 in p[0] else [1, p[1].index(s2)]
334
                p[a[0]][a[1]], p[b[0]][b[1]] = p[b[0]][b[1]], p[a[0]][a[1]]
335
336
        if self.start_index != self.end_index:
337
            self.set_plugin_datasets()
338
        else:
339
            info_str = f"Not setting plugin datasets for " \
340
                       f"{self.start_plugin.name}, since iterating only a "\
341
                       f"single plugin"
342
            logging.debug(info_str)
343
344
345
    def set_plugin_datasets(self):
346
        """
347
        Set the PluginData objects for the original and cloned Data objects,
348
        based on the current iteration.
349
        """
350
        p = [
351
            self.start_plugin.parameters['in_datasets'],
352
            self.end_plugin.parameters['out_datasets']
353
        ]
354
355
        for s1, s2 in self._ip_data_dict['iterating'].items():
356
            # change the PluginData obejcts for the in and out datasets, to take
357
            # care of the potential switching of patterns
358 View Code Duplication
            if s1 in p[0]:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
359
                new_original_data_pData = \
360
                    self._ip_plugin_data_dict['original']['start_plugin']
361
                self.start_plugin.parameters['plugin_in_datasets'] = \
362
                    [new_original_data_pData]
363
                p[0][0]._set_plugin_data(new_original_data_pData)
364
            elif s1 in p[1]:
365
                new_original_data_pData = \
366
                    self._ip_plugin_data_dict['original']['end_plugin']
367
                self.end_plugin.parameters['plugin_out_datasets'] = \
368
                    [new_original_data_pData]
369
                p[1][0]._set_plugin_data(new_original_data_pData)
370
            else:
371
                info_str = f"s1 {s1.backing_file} wasn't in either the start " \
372
                           f"plugin's plugin_in_datasets, nor the end " \
373
                           f"plugin's plugin_out_datasets"
374
                logging.debug(info_str)
375
376 View Code Duplication
            if s2 in p[0]:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
377
                new_cloned_data_pData = \
378
                    self._ip_plugin_data_dict['clone']['start_plugin']
379
                self.start_plugin.parameters['plugin_in_datasets'] = \
380
                    [new_cloned_data_pData]
381
                p[0][0]._set_plugin_data(new_cloned_data_pData)
382
            elif s2 in p[1]:
383
                new_cloned_data_pData = \
384
                    self._ip_plugin_data_dict['clone']['end_plugin']
385
                self.end_plugin.parameters['plugin_out_datasets'] = \
386
                    [new_cloned_data_pData]
387
                p[1][0]._set_plugin_data(new_cloned_data_pData)
388
            else:
389
                info_str = f"s2 {s2.backing_file} wasn't in either the start " \
390
                           f"plugin's plugin_in_datasets, nor the end " \
391
                           f"plugin's plugin_out_datasets"
392
                logging.debug(info_str)
393
394
        # reset the values inside Data.data_info that have an effect on how the
395
        # value of core_slice in SliceLists comes out
396
        self.start_plugin._finalise_datasets()
397
        self.start_plugin._finalise_plugin_datasets()
398
        self.end_plugin._finalise_datasets()
399
        self.end_plugin._finalise_plugin_datasets()
400
401 View Code Duplication
    def _finalise_iterated_datasets(self, exp):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
402
        '''
403
        Inspect the two Data objects that are used to contain the input and
404
        output data for iterations over the course of the iterative processing
405
        (input/output depending on which particular iteration was being done).
406
407
        Mark one of them as the "final dataset" to be added to the output
408
        NeXuS file, and mark the other as "obsolete/to be removed".
409
410
        The decision between which one is kept and which one is removed
411
        depends on which Data object contains the OUTPUT of the very last
412
        iteration.
413
414
        For an odd number of iterations, this is the "original" Data object.
415
        For an even number of iteration, this is the "clone" Data object.
416
        '''
417
        for s1, s2 in self._ip_data_dict['iterating'].items():
418
            name = s1.get_name()
419
            name = name if 'itr_clone' not in name else s2.get_name()
420
            final_dataset = s1 if s1 in self.end_plugin.parameters['out_datasets'] else s2
421
            obsolete = s1 if s1 is not final_dataset else s2
422
            obsolete.remove = True
423
424
            # switch names if necessary
425
            if final_dataset.get_name() != name:
426
                # If this is true, then the output dataset of the last
427
                # iteration is the clone Data object (hence, the mismatched
428
                # names).
429
                #
430
                # So then:
431
                # - obsolete = original
432
                # - final_dataset = clone
433
                #
434
                # which means that the CLONED dataset needs to be set in the
435
                # Experiment object (self.exp) as the "out data", but under
436
                # the name of the ORIGINAL dataset.
437
                # And also, the ORIGINAL dataset is set in the Experiment
438
                # object, but under the name of the CLONED/OBSOLETE dataset
439
                temp = obsolete
440
                exp.index['out_data'][name] = final_dataset
441
                exp.index['out_data'][s2.get_name()] = temp
442
                # One last thing to do in this case is to set the "name"
443
                # inside the Data object that final_result is set to.
444
                #
445
                # This is because, in this case, the CLONED dataset is in
446
                # final_result, and the "name" within the Data object will
447
                # be some value like "itr_0".
448
                #
449
                # However, the name within the Data object needs to be the
450
                # name of the ORIGINAL Data object in order for the creation
451
                # of the output NeXuS file to work.
452
                final_dataset._set_name(name)
453
454
    def set_alternating_datasets(self):
455
        d1 = self.end_plugin.parameters['out_datasets'][0]
456
        d2 = self.end_plugin.parameters['out_datasets'][1]
457
        names = [d1.get_name(), d2.get_name()]
458
        if not any([True if 'itr_clone' in i else False for i in names]):
459
            raise Exception('Alternating datasets must contain a clone.  These'
460
                            ' are found at the end of the out_datasets list')
461
        self._ip_data_dict['iterating'][d1] = d2
462
463
    def set_alternating_plugin_datasets(self):
464
        """
465
        Setup the PluginData objects for the original and cloned Data objects
466
        """
467
        if self.start_index != self.end_index:
468
            self._ip_plugin_data_dict['original']['start_plugin'] = \
469
                self.start_plugin.parameters['plugin_in_datasets'][1]
470
            self._ip_plugin_data_dict['clone']['start_plugin'] = \
471
                self.start_plugin.parameters['plugin_in_datasets'][2]
472
473
            self._ip_plugin_data_dict['original']['end_plugin'] = \
474
                self.end_plugin.parameters['plugin_out_datasets'][0]
475
            self._ip_plugin_data_dict['clone']['end_plugin'] = \
476
                self.end_plugin.parameters['plugin_out_datasets'][1]
477
        else:
478
            info_str = f"Not setting up alternating plugin datasets for " \
479
                       f"{self.start_plugin.name}, since iterating only a "\
480
                       f"single plugin"
481
            logging.debug(info_str)