Test Failed
Pull Request — master (#888)
by Daniil
05:08
created

savu.core.iterative_plugin_runner   B

Complexity

Total Complexity 49

Size/Duplication

Total Lines 456
Duplicated Lines 18.86 %

Importance

Changes 0
Metric Value
eloc 186
dl 86
loc 456
rs 8.48
c 0
b 0
f 0
wmc 49

18 Methods

Rating   Name   Duplication   Size   Complexity  
A IteratePluginGroup.set_iteration_datasets() 0 2 1
A IteratePluginGroup.set_alternating_plugin_datasets() 0 19 2
A IteratePluginGroup._execute_iterations() 0 49 4
B IteratePluginGroup.__set_datasets() 0 56 7
A IteratePluginGroup.setup_datasets() 0 14 1
B IteratePluginGroup.set_plugin_datasets() 34 55 6
B IteratePluginGroup._execute_iteration_0() 0 72 7
A IteratePluginGroup.set_alternating_datasets() 0 6 3
A IteratePluginGroup.__init__() 0 50 1
A IteratePluginGroup.set_start_plugin() 0 5 1
A IteratePluginGroup.add_plugin_to_iterate_group() 0 6 1
A IteratePluginGroup.increment_ip_iteration() 0 2 1
A IteratePluginGroup.get_plugin_datasets() 0 6 1
A IteratePluginGroup.__set_original_datasets() 0 7 1
A IteratePluginGroup.get_original_datasets() 0 5 1
A IteratePluginGroup.create_clone() 0 5 1
A IteratePluginGroup.set_end_plugin() 0 5 1
B IteratePluginGroup._finalise_iterated_datasets() 52 52 6

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like savu.core.iterative_plugin_runner often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
from savu.core.iterate_plugin_group_utils import shift_plugin_index
2
3
4
class IteratePluginGroup():
5
    '''
6
    Class for iterating a set/group of plugins in a process list
7
    '''
8
9
    def __init__(self, plugin_runner, start_index, end_index, iterations):
10
        self.in_data = None
11
        self.out_data = None
12
        # PluginRunner object for running the individual plugns in the group of
13
        # pluigns to iterate over
14
        self.plugin_runner = plugin_runner
15
16
        # nPlugin index of plugin that is at the start of group to iterate over
17
        self.start_index = start_index
18
        # nPlugin index of plugin that is at the end of group to iterate over
19
        self.end_index = end_index
20
21
        # dict of plugins needed to run the plugins using
22
        # PluginRunner.__run_plugin()
23
        self.plugins = []
24
25
        # Note: copied from CpuIterativePlugin
26
27
        # the current iteration: starts counting at 0 (zero-based)
28
        self._ip_iteration = 0
29
        # the number of iterations to perform: starts counting at 1 (one-based)
30
        self._ip_fixed_iterations = iterations
31
        # The _ip_data_dict value eventually holds 3 keys:
32
        # - 'iterating'
33
        # - 0
34
        # - 1
35
        # The name of the 0 key refers to the 0th iteration, and the name of the
36
        # 1 key refers to the 1st iteration
37
        # The values of the 0 key is a list containing two lists (both with only
38
        # one element in them):
39
        # - a list contining the input NeXuS file
40
        # - a list containing the Data object used as an input AND output dataset
41
        # (depending on the iteration number) with the "original" backing file
42
        # (ie, NOT the "cloned" backing file)
43
        # The value of the 1 key is a list containing two lists (one containing
44
        # one element, one containing two elements):
45
        # - a list containing the input NeXuS file, and also the Data object
46
        #   with the "original" backing file
47
        # - a list containing the Data object with the "clone" backing file
48
        self._ip_data_dict = {}
49
        # The dict value of the 'iterating' key contains only one key-value
50
        # pair throughout all iterations:
51
        # - the key is the "original" Data object
52
        # - the value is the "cloned" Data object
53
        self._ip_data_dict['iterating'] = {}
54
55
        # dict for holding the different PluginData objects involved
56
        self._ip_plugin_data_dict = {
57
            'original': {},
58
            'clone': {}
59
        }
60
61
    def setup_datasets(self):
62
        '''
63
        Setup the cloned datasets in the start and end plugins in the group to
64
        iterate over
65
        '''
66
        self.__set_original_datasets()
67
        # get the in and out datasets, like in IterativeCcpiDenosing.setup()
68
        in_dataset, out_dataset = self.get_original_datasets()
69
70
        # set the input and output datasets for the first iteration
71
        self.set_iteration_datasets(0, [in_dataset[0]], [out_dataset[0]])
72
        # set the input and output datasets for subsequent iterations
73
        self.set_iteration_datasets(1, [in_dataset[0], out_dataset[0]],
74
                                    [out_dataset[1]])
75
        # out_dataset[0] and out_dataset[1] will continue to alternate for
76
        # all remaining iterations i.e. output becomes input and input becomes
77
        # output.
78
79
    def _execute_iteration_0(self, exp, transport):
80
        '''
81
        Run plugins for iteration 0
82
        '''
83
        start = shift_plugin_index(exp, self.start_index)
84
        end = shift_plugin_index(exp, self.end_index)
85
86
        nPlugin = exp.meta_data.get('nPlugin')
87
        exp_coll = exp._get_collection()
88
        if start == end and nPlugin == end:
89
            # start == end -> group of plugins to iterate over is a single
90
            # plugin
91
92
            plugin_name = \
93
                self.plugin_runner._PluginRunner__run_plugin(
94
                    exp_coll['plugin_dict'][nPlugin],
95
                    clean_up_plugin=False)
96
97
            # since the end plugin has now been run, the group of plugins to
98
            # iterate over has been executed once, and this counts as having
99
            # done one iteration (ie, at this point, iteration 0 is
100
            # complete)
101
            self.increment_ip_iteration()
102
            # kick off all subsequent iterations
103
            self._execute_iterations(exp, transport)
104
            # finished all iterations, set which output dataset to keep, and
105
            # which to remove
106
            self._finalise_iterated_datasets(exp)
107
        else:
108
            # start != end -> group of plugins to iterate over is more than one
109
            # plugin
110
            if nPlugin == start:
111
                # start plugin is being run, on iteration 0
112
                print(f"Iteration {self._ip_iteration}")
113
                plugin = self.plugin_runner._PluginRunner__run_plugin(
114
                    exp_coll['plugin_dict'][nPlugin],
115
                    clean_up_plugin=False)
116
                plugin_name = plugin.name
117
                self.set_start_plugin(plugin)
118
            elif nPlugin == end:
119
                # end plugin is being run, on iteration 0
120
121
                plugin_name = \
122
                    self.plugin_runner._PluginRunner__run_plugin(
123
                        exp_coll['plugin_dict'][nPlugin],
124
                        clean_up_plugin=False)
125
126
                # since the end plugin has now been run, the group of plugins to
127
                # iterate over has been executed once, and this counts as having
128
                # done one iteration (ie, at this point, iteration 0 is
129
                # complete)
130
                self.increment_ip_iteration()
131
                # kick off all subsequent iterations
132
                self._execute_iterations(exp, transport)
133
                # finished all iterations, set which output dataset to keep, and
134
                # which to remove
135
                self._finalise_iterated_datasets(exp)
136
            elif nPlugin >= start and nPlugin <= end:
137
                # a "middle" plugin is being run on iteration 0
138
                plugin = self.plugin_runner._PluginRunner__run_plugin(
139
                    exp_coll['plugin_dict'][nPlugin])
140
                plugin_name = plugin.name
141
            else:
142
                info_dict = {
143
                    'start_index': self.start_index,
144
                    'end_index': self.end_index
145
                }
146
                err_str = f"Encountered an unknown case when running inside " \
147
                    f"an iterative loop. IteratePluginGroup info: {info_dict}"
148
                raise Exception(err_str)
149
150
        return plugin_name
151
152
    def _execute_iterations(self, exp, transport):
153
        '''
154
        Execute all iterations from iteration 1 onwards (iteration 0 is
155
        currently handled by methods in PluginRunner).
156
        '''
157
        # The docstring of this method in IterativePlugin is the following:
158
        #
159
        # Run the pre_process, process, and post_process methods.
160
        #
161
        # However, there is no obvious point where those methods are called,
162
        # so perhaps this docstring isn't quite accurate? (Also note that this
163
        # sentence has been copied from the docstring
164
        # BaseDriver._run_plugin_instances(), so maybe it is just a generic
165
        # description of what this method SHOULD do, but doesn't yet do,
166
        # in IterativePlugin)
167
168
        while self._ip_iteration < self._ip_fixed_iterations:
169
            print(f"Iteration {self._ip_iteration}...")
170
            self.__set_datasets()
171
            # replace this with the PluginRunner.__run_plugin() method to run
172
            # the individual plugins in the group of plugins to iterate
173
            #self._run_plugin_instances(transport, self.get_communicator())
174
175
            # clean up the plugins in the group to iterate over IF the last
176
            # iteration is being executed
177
            if self._ip_iteration == self._ip_fixed_iterations - 1:
178
                clean_up_plugin = True
179
            else:
180
                clean_up_plugin = False
181
            # naughty naughty, to run a double underscore method, but for now,
182
            # just testing...
183
            for plugin in self.plugins:
184
                print(f"Running {plugin.name} in iterative group of plugins")
185
                # TODO: need to pass the plguin dict, or something more than an
186
                # empty dict...
187
                self.plugin_runner._PluginRunner__run_plugin({},
188
                    clean_up_plugin=clean_up_plugin,
189
                    plugin=plugin)
190
191
            # if self._ip_fixed_iterations has been set to something other
192
            # than its original value of False, and if the current iteration
193
            # (the one that has just been completed) is the LAST iteration,
194
            # then processing has been completed
195
            #
196
            # Note that _ip_iteration starts counting at 0,
197
            # but _ip_fixed_iterations starts counting at 1, so if you have
198
            # reached _ip_iteration=n, then this means that n+1 iterations
199
            # have been performed
200
            self.increment_ip_iteration()
201
202
    def increment_ip_iteration(self):
203
        self._ip_iteration += 1
204
205
    def __set_original_datasets(self):
206
        '''
207
        Utility function to make the (original) in dataset, and out dataset,
208
        easier to reference
209
        '''
210
        self.in_data = self.start_plugin.parameters['in_datasets']
211
        self.out_data = self.end_plugin.parameters['out_datasets']
212
213
    def get_original_datasets(self):
214
        '''
215
        Helper function to get the in and out datasets more easily.
216
        '''
217
        return self.in_data, self.out_data
218
219
    def get_plugin_datasets(self):
220
        '''
221
        Helper function to get the in and out plugin datasets more easily.
222
        '''
223
        return self.start_plugin.parameters['plugin_in_datasets'], \
224
            self.end_plugin.parameters['plugin_out_datasets']
225
226
    def create_clone(self, clone, data):
227
        clone.create_dataset(data)
228
        clone.data_info.set('clone', data.get_name())
229
        # alternate a dataset with its clone
230
        self.set_alternating_datasets(data, clone)
231
232
    def set_alternating_datasets(self, d1, d2):
233
        names = [d1.get_name(), d2.get_name()]
234
        if not any([True if 'itr_clone' in i else False for i in names]):
235
            raise Exception('Alternating datasets must contain a clone.  These'
236
                            ' are found at the end of the out_datasets list')
237
        self._ip_data_dict['iterating'][d1] = d2
238
239
    def set_iteration_datasets(self, itr, in_data, out_data):
240
        self._ip_data_dict[itr] = [in_data, out_data]
241
242
    def set_start_plugin(self, plugin):
243
        '''
244
        Set the plugin that is at the start of the group to iterate over
245
        '''
246
        self.start_plugin = plugin
247
248
    def set_end_plugin(self, plugin):
249
        '''
250
        Set the plugin that is at the end of the group to iterate over
251
        '''
252
        self.end_plugin = plugin
253
254
    def add_plugin_to_iterate_group(self, plugin):
255
        '''
256
        Append plugin dict to list fo plguins that are part of the group to
257
        iterate over
258
        '''
259
        self.plugins.append(plugin)
260
261
    def __set_datasets(self):
262
        '''
263
        Set the input and output datasets such that
264
        - the output dataset from the previous iteration is the input dataset of
265
          the current iteration that is about to be performed
266
        - the input dataset from the previous iteration is used to write the
267
          output of the current iteration that is about to be performed
268
        '''
269
        # TODO: perhaps the pattern should be changed here, to make use of
270
        #  the same logic that is being used to switch the original & cloned
271
        #  data?
272
273
        # Only the 0th and 1st iterations are set in _ip_data_dicts, there is
274
        # NOT a key for every iteration in _ip_data_dict, hence this if/elif
275
        # block
276
        if self._ip_iteration in list(self._ip_data_dict.keys()):
277
            # If on the 0th or 1st iteration, set the in_datasets and
278
            # out_datasets according to the structure  defined in _ip_data_dict
279
            #
280
            # The body of this if statement is essentially a way to "set up" the
281
            # input and ouput datasets so that for iterations after the 0th and
282
            # 1st, the two datasets that are swapped between being used for
283
            # input or output (depending on the particular iteration) can be
284
            # swapped WITHOUT having to define a key-value pair in
285
            # _ip_data_dict for EVERY SINGLE ITERATION
286
            self.start_plugin.parameters['in_datasets'] = [self._ip_data_dict[self._ip_iteration][0][-1]]
287
            self.end_plugin.parameters['out_datasets'] = self._ip_data_dict[self._ip_iteration][1]
288
        elif self._ip_iteration > 0:
289
            # If on an iteration greater than 1 (since the if statement catches
290
            # both iteration 0 and 1), then there is some (fiddly...) logic
291
            # here to essentially SWAP the out dataset from the previous
292
            # iteration with the in dataset of the previous iteration
293
            #
294
            # Practically speaking, this means that:
295
            # - the out dataset from the previous iteration is used as the input
296
            #   for the current iteration that is about to be performed
297
            # - the in dataset from the previous iteration is free to be used to
298
            #   write the output of the current iteration that is about to be
299
            #   performed
300
            p = [
301
                self.start_plugin.parameters['in_datasets'],
302
                self.end_plugin.parameters['out_datasets']
303
            ]
304
305
            for s1, s2 in self._ip_data_dict['iterating'].items():
306
                a = [0, p[0].index(s1)] if s1 in p[0] else [1, p[1].index(s1)]
307
                b = [0, p[0].index(s2)] if s2 in p[0] else [1, p[1].index(s2)]
308
                p[a[0]][a[1]], p[b[0]][b[1]] = p[b[0]][b[1]], p[a[0]][a[1]]
309
310
        if self.start_index != self.end_index:
311
            self.set_plugin_datasets()
312
        else:
313
            info_str = f"Not setting plugin datasets for " \
314
                       f"{self.start_plugin.name}, since iterating only a "\
315
                       f"single plugin"
316
            print(info_str)
317
318
319
    def set_plugin_datasets(self):
320
        """
321
        Set the PluginData objects for the original and cloned Data objects,
322
        based on the current iteration.
323
        """
324
        p = [
325
            self.start_plugin.parameters['in_datasets'],
326
            self.end_plugin.parameters['out_datasets']
327
        ]
328
329
        for s1, s2 in self._ip_data_dict['iterating'].items():
330
            # change the PluginData obejcts for the in and out datasets, to take
331
            # care of the potential switching of patterns
332 View Code Duplication
            if s1 in p[0]:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
333
                new_original_data_pData = \
334
                    self._ip_plugin_data_dict['original']['start_plugin']
335
                self.start_plugin.parameters['plugin_in_datasets'] = \
336
                    [new_original_data_pData]
337
                p[0][0]._set_plugin_data(new_original_data_pData)
338
            elif s1 in p[1]:
339
                new_original_data_pData = \
340
                    self._ip_plugin_data_dict['original']['end_plugin']
341
                self.end_plugin.parameters['plugin_out_datasets'] = \
342
                    [new_original_data_pData]
343
                p[1][0]._set_plugin_data(new_original_data_pData)
344
            else:
345
                info_str = f"s1 {s1.backing_file} wasn't in either the start " \
346
                           f"plugin's plugin_in_datasets, nor the end " \
347
                           f"plugin's plugin_out_datasets"
348
                print(info_str)
349
350 View Code Duplication
            if s2 in p[0]:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
351
                new_cloned_data_pData = \
352
                    self._ip_plugin_data_dict['clone']['start_plugin']
353
                self.start_plugin.parameters['plugin_in_datasets'] = \
354
                    [new_cloned_data_pData]
355
                p[0][0]._set_plugin_data(new_cloned_data_pData)
356
            elif s2 in p[1]:
357
                new_cloned_data_pData = \
358
                    self._ip_plugin_data_dict['clone']['end_plugin']
359
                self.end_plugin.parameters['plugin_out_datasets'] = \
360
                    [new_cloned_data_pData]
361
                p[1][0]._set_plugin_data(new_cloned_data_pData)
362
            else:
363
                info_str = f"s2 {s2.backing_file} wasn't in either the start " \
364
                           f"plugin's plugin_in_datasets, nor the end " \
365
                           f"plugin's plugin_out_datasets"
366
                print(info_str)
367
368
        # reset the values inside Data.data_info that have an effect on how the
369
        # value of core_slice in SliceLists comes out
370
        self.start_plugin._finalise_datasets()
371
        self.start_plugin._finalise_plugin_datasets()
372
        self.end_plugin._finalise_datasets()
373
        self.end_plugin._finalise_plugin_datasets()
374
375 View Code Duplication
    def _finalise_iterated_datasets(self, exp):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
376
        '''
377
        Inspect the two Data objects that are used to contain the input and
378
        output data for iterations over the course of the iterative processing
379
        (input/output depending on which particular iteration was being done).
380
381
        Mark one of them as the "final dataset" to be added to the output
382
        NeXuS file, and mark the other as "obsolete/to be removed".
383
384
        The decision between which one is kept and which one is removed
385
        depends on which Data object contains the OUTPUT of the very last
386
        iteration.
387
388
        For an odd number of iterations, this is the "original" Data object.
389
        For an even number of iteration, this is the "clone" Data object.
390
        '''
391
        for s1, s2 in self._ip_data_dict['iterating'].items():
392
            name = s1.get_name()
393
            name = name if 'itr_clone' not in name else s2.get_name()
394
            final_dataset = s1 if s1 in self.end_plugin.parameters['out_datasets'] else s2
395
            obsolete = s1 if s1 is not final_dataset else s2
396
            obsolete.remove = True
397
398
            # switch names if necessary
399
            if final_dataset.get_name() != name:
400
                # If this is true, then the output dataset of the last
401
                # iteration is the clone Data object (hence, the mismatched
402
                # names).
403
                #
404
                # So then:
405
                # - obsolete = original
406
                # - final_dataset = clone
407
                #
408
                # which means that the CLONED dataset needs to be set in the
409
                # Experiment object (self.exp) as the "out data", but under
410
                # the name of the ORIGINAL dataset.
411
                # And also, the ORIGINAL dataset is set in the Experiment
412
                # object, but under the name of the CLONED/OBSOLETE dataset
413
                temp = obsolete
414
                exp.index['out_data'][name] = final_dataset
415
                exp.index['out_data'][s2.get_name()] = temp
416
                # One last thing to do in this case is to set the "name"
417
                # inside the Data object that final_result is set to.
418
                #
419
                # This is because, in this case, the CLONED dataset is in
420
                # final_result, and the "name" within the Data object will
421
                # be some value like "itr_0".
422
                #
423
                # However, the name within the Data object needs to be the
424
                # name of the ORIGINAL Data object in order for the creation
425
                # of the output NeXuS file to work.
426
                final_dataset._set_name(name)
427
428
    def set_alternating_datasets(self):
429
        d1 = self.end_plugin.parameters['out_datasets'][0]
430
        d2 = self.end_plugin.parameters['out_datasets'][1]
431
        names = [d1.get_name(), d2.get_name()]
432
        if not any([True if 'itr_clone' in i else False for i in names]):
433
            raise Exception('Alternating datasets must contain a clone.  These'
434
                            ' are found at the end of the out_datasets list')
435
        self._ip_data_dict['iterating'][d1] = d2
436
437
    def set_alternating_plugin_datasets(self):
438
        """
439
        Setup the PluginData objects for the original and cloned Data objects
440
        """
441
        if self.start_index != self.end_index:
442
            self._ip_plugin_data_dict['original']['start_plugin'] = \
443
                self.start_plugin.parameters['plugin_in_datasets'][1]
444
            self._ip_plugin_data_dict['clone']['start_plugin'] = \
445
                self.start_plugin.parameters['plugin_in_datasets'][2]
446
447
            self._ip_plugin_data_dict['original']['end_plugin'] = \
448
                self.end_plugin.parameters['plugin_out_datasets'][0]
449
            self._ip_plugin_data_dict['clone']['end_plugin'] = \
450
                self.end_plugin.parameters['plugin_out_datasets'][1]
451
        else:
452
            info_str = f"Not setting up alternating plugin datasets for " \
453
                       f"{self.start_plugin.name}, since iterating only a "\
454
                       f"single plugin"
455
            print(info_str)