Test Failed
Push — master ( 9ba79c...17f3e3 )
by Yousef
01:54 queued 19s
created

CpuIterativePlugin.__set_original_datasets()   A

Complexity

Conditions 1

Size

Total Lines 7
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 3
nop 1
dl 0
loc 7
rs 10
c 0
b 0
f 0
1
# Copyright 2014 Diamond Light Source Ltd.
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#     http://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14
15
"""
16
.. module:: cpu_iterative_plugin
17
   :platform: Unix
18
   :synopsis: Base class for all plugins which use a CPU on the target machine
19
   and have the ability to be iterative
20
21
.. moduleauthor:: Daniil Kazantsev & Yousef Moazzam <[email protected]>
22
23
"""
24
25
from savu.plugins.driver.plugin_driver import PluginDriver
26
from savu.plugins.driver.basic_driver import BasicDriver
27
28
import os
29
_base = BasicDriver if os.environ['savu_mode'] == 'basic' else PluginDriver
30
31
32
class CpuIterativePlugin(_base):
33
34
    def __init__(self):
35
        super(CpuIterativePlugin, self).__init__()
36
        # set any plugin that inherits from CpuIterativePluign to be iterative
37
        self._is_iterative = True
38
        # the current iteration: starts counting at 0 (zero-based)
39
        self._ip_iteration = 0
40
        # the number of iterations to perform: starts counting at 1 (one-based)
41
        # TODO: should this have a starting/default value of 1 rather than
42
        #  False, to signify that the default number of iterations is just 1?
43
        # self._ip_fixed_iterations = False
44
        self._ip_fixed_iterations = 3
45
        # a bool describing if all iterations have been completed
46
        self._ip_complete = False
47
        # The _ip_data_dict value eventually holds 3 keys:
48
        # - 'iterating'
49
        # - 0
50
        # - 1
51
        # The name of the 0 key refers to the 0th iteration, and the name of the
52
        # 1 key refers to the 1st iteration
53
        # The values of the 0 key is a list containing two lists (both with only
54
        # one element in them):
55
        # - a list contining the input NeXuS file
56
        # - a list containing the Data object used as an input AND output dataset
57
        # (depending on the iteration number) with the "original" backing file
58
        # (ie, NOT the "cloned" backing file)
59
        # The value of the 1 key is a list containing two lists (one containing
60
        # one element, one containing two elements):
61
        # - a list containing the input NeXuS file, and also the Data object
62
        #   with the "original" backing file
63
        # - a list containing the Data object with the "clone" backing file
64
        self._ip_data_dict = {}
65
        # The dict value of the 'iterating' key contains only one key-value
66
        # pair throughout all iterations:
67
        # - the key is the "original" Data object
68
        # - the value is the "cloned" Data object
69
        # TODO: figure out how the name of the backing files created is set (ie,
70
        #  where does the "iterative_ccpi_denoising" bit come from?)
71
        self._ip_data_dict['iterating'] = {}
72
73
        # similar to _ip_data_dict, but for the pattern of the original &
74
        # cloned datasets, depending on the current iteration number
75
        self._ip_pattern_dict = {}
76
        self._ip_pattern_dict['iterating'] = {}
77
78
    def setup_iterative_plugin(self):
79
        '''
80
        Run this method instead of the setup() method in the plugin, if the
81
        plugin is being run iteratively.
82
83
        Setup the cloned datasets only if the number of iterations is
84
        greater than 1 (so then the cloned dataset isn't unnecessarily defined
85
        for running just a single iteration).
86
        '''
87
        if self._ip_fixed_iterations and self._ip_fixed_iterations > 1:
88
            self.__set_original_datasets()
89
            # get the in and out datasets, like in IterativeCcpiDenosing.setup()
90
            in_dataset, out_dataset = self.get_original_datasets()
91
92
            # get the PluginData objcts, like in IterativeCcpiDenosing.setup()
93
            in_pData = self.parameters['plugin_in_datasets']
94
            out_pData = self.parameters['plugin_out_datasets']
95
96
            # set the pattern for the single input dataset
97
            in_pData[0].plugin_data_setup(self.parameters['pattern'], 'single')
98
99
            # Cloned datasets are at the end of the out_dataset list
100
            out_dataset[0].create_dataset(in_dataset[0])
101
102
            # What is a cloned dataset?
103
            # Since each dataset in Savu has its own backing hdf5 file, a dataset
104
            # cannot be used for input and output at the same time.  So, in the
105
            # case of iterative plugins, if a dataset is used as output and then
106
            # as input on the next iteration, the subsequent output must be a
107
            # different file.
108
            # A cloned dataset is a copy of another dataset but with a different
109
            # backing file.  It doesn't have a name, is not accessible as a dataset
110
            # in the framework and is only used in alternation with another
111
            # dataset to allow it to be used as both input and output
112
            # simultaneously.
113
114
            # This is a cloned dataset (of out_dataset[0])
115
            self.create_clone(out_dataset[1], out_dataset[0])
116
117
            # set the pattern for the PluginData objects associated with the two
118
            # ouptut datasets (original and clone)
119
            out_pData[0].plugin_data_setup(self.parameters['pattern'], 'single')
120
            out_pData[1].plugin_data_setup(self.parameters['pattern'], 'single')
121
122
            # set the input and output datasets for the first iteration
123
            self.set_iteration_datasets(0, [in_dataset[0]], [out_dataset[0]],
124
                                        self.parameters['pattern'])
125
            # set the input and output datasets for subsequent iterations
126
            self.set_iteration_datasets(1, [in_dataset[0], out_dataset[0]],
127
                                        [out_dataset[1]],
128
                                        self.parameters['pattern'])
129
            # out_dataset[0] and out_dataset[1] will continue to alternate for
130
            # all remaining iterations i.e. output becomes input and input becomes
131
            # output.
132
133 View Code Duplication
    def _run_plugin(self, exp, transport):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
134
        '''
135
        Execute the iterative processing.
136
        '''
137
        # The docstring of this method in IterativePlugin is the following:
138
        #
139
        # Run the pre_process, process, and post_process methods.
140
        #
141
        # However, there is no obvious point where those methods are called,
142
        # so perhaps this docstring isn't quite accurate? (Also note that this
143
        # sentence has been copied from the docstring
144
        # BaseDriver._run_plugin_instances(), so maybe it is just a generic
145
        # description of what this method SHOULD do, but doesn't yet do,
146
        # in IterativePlugin)
147
148
        if self._ip_complete:
149
            self.__finalise_datasets()
150
            return
151
        else:
152
            print(f"Iteration {self._ip_iteration}...")
153
            self.__set_datasets()
154
            self._run_plugin_instances(transport, self.get_communicator())
155
            # transport.no_processing is related to the nTrans variable that
156
            # is seen in various places in the "transport layer"
157
            # TODO: figure out what nTrans is/means
158
            if transport.no_processing:
159
                self.set_processing_complete()
160
161
            # if self._ip_fixed_iterations has been set to something other
162
            # than its original value of False, and if the current iteration
163
            # (the one that has just been completed) is the LAST iteration,
164
            # then processing has been completed
165
            #
166
            # Note that _ip_iteration starts counting at 0,
167
            # but _ip_fixed_iterations starts counting at 1, so if you have
168
            # reached _ip_iteration=n, then this means that n+1 iterations
169
            # have been performed
170
            if self._ip_fixed_iterations and \
171
                    self._ip_iteration == self._ip_fixed_iterations - 1:
172
                self.set_processing_complete()
173
            self._ip_iteration += 1
174
            # start another iteration
175
            self._run_plugin(exp, transport)
176
177
    def create_clone(self, clone, data):
178
        clone.create_dataset(data)
179
        clone.data_info.set('clone', data.get_name())
180
        # alternate a dataset with its clone
181
        self.set_alternating_datasets(data, clone)
182
183 View Code Duplication
    def __set_datasets(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
184
        '''
185
        Set the input and output datasets such that
186
        - the output dataset from the previous iteration is the input dataset of
187
          the current iteration that is about to be performed
188
        - the input dataset from the previous iteration is used to write the
189
          output of the current iteration that is about to be performed
190
        '''
191
        # TODO: perhaps the pattern should be changed here, to make use of
192
        #  the same logic that is being used to switch the original & cloned
193
        #  data?
194
        params = self.parameters
195
        # Only the 0th and 1st iterations are set in _ip_data_dicts, there is
196
        # NOT a key for every iteration in _ip_data_dict, hence this if/elif
197
        # block
198
        if self._ip_iteration in list(self._ip_data_dict.keys()):
199
            # If on the 0th or 1st iteration, set the in_datasets and
200
            # out_datasets according to the structure  defined in _ip_data_dict
201
            #
202
            # The body of this if statement is essentially a way to "set up" the
203
            # input and ouput datasets so that for iterations after the 0th and
204
            # 1st, the two datasets that are swapped between being used for
205
            # input or output (depending on the particular iteration) can be
206
            # swapped WITHOUT having to define a key-value pair in
207
            # _ip_data_dict for EVERY SINGLE ITERATION
208
            params['in_datasets'] = self._ip_data_dict[self._ip_iteration][0]
209
            params['out_datasets'] = self._ip_data_dict[self._ip_iteration][1]
210
        elif self._ip_iteration > 0:
211
            # If on an iteration greater than 1 (since the if statement catches
212
            # both iteration 0 and 1), then there is some (fiddly...) logic
213
            # here to essentially SWAP the out dataset from the previous
214
            # iteration with the in dataset of the previous iteration
215
            #
216
            # Practically speaking, this means that:
217
            # - the out dataset from the previous iteration is used as the input
218
            #   for the current iteration that is about to be performed
219
            # - the in dataset from the previous iteration is free to be used to
220
            #   write the output of the current iteration that is about to be
221
            #   performed
222
            p = [params['in_datasets'], params['out_datasets']]
223
            for s1, s2 in self._ip_data_dict['iterating'].items():
224
                a = [0, p[0].index(s1)] if s1 in p[0] else [1, p[1].index(s1)]
225
                b = [0, p[0].index(s2)] if s2 in p[0] else [1, p[1].index(s2)]
226
                p[a[0]][a[1]], p[b[0]][b[1]] = p[b[0]][b[1]], p[a[0]][a[1]]
227
228
    def set_alternating_datasets(self, d1, d2):
229
        names = [d1.get_name(), d2.get_name()]
230
        if not any([True if 'itr_clone' in i else False for i in names]):
231
            raise Exception('Alternating datasets must contain a clone.  These'
232
                            ' are found at the end of the out_datasets list')
233
        self._ip_data_dict['iterating'][d1] = d2
234
235
    def get_alternating_datasets(self):
236
        return self._ip_data_dict['iterating']
237
238
    def set_iteration_datasets(self, itr, in_data, out_data, pattern=None):
239
        self._ip_data_dict[itr] = [in_data, out_data]
240
        self._ip_pattern_dict[itr] = pattern
241
242
    def set_processing_complete(self):
243
        '''
244
        Signal that the final iteration has been executed.
245
        '''
246
        self._ip_complete = True
247
248 View Code Duplication
    def __finalise_datasets(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
249
        '''
250
        Inspect the two Data objects that are used to contain the input and
251
        output data for iterations over the course of the iterative processing
252
        (input/output depending on which particular iteration was being done).
253
254
        Mark one of them as the "final dataset" to be added to the output
255
        NeXuS file, and mark the other as "obsolete/to be removed".
256
257
        The decision between which one is kept and which one is removed
258
        depends on which Data object contains the OUTPUT of the very last
259
        iteration.
260
261
        For an odd number of iterations, this is the "original" Data object.
262
        For an even number of iteration, this is the "clone" Data object.
263
        '''
264
        for s1, s2 in self._ip_data_dict['iterating'].items():
265
            name = s1.get_name()
266
            name = name if 'itr_clone' not in name else s2.get_name()
267
            final_dataset = s1 if s1 in self.parameters['out_datasets'] else s2
268
            obsolete = s1 if s1 is not final_dataset else s2
269
            obsolete.remove = True
270
271
            # switch names if necessary
272
            if final_dataset.get_name() != name:
273
                # If this is true, then the output dataset of the last
274
                # iteration is the clone Data object (hence, the mismatched
275
                # names).
276
                #
277
                # So then:
278
                # - obsolete = original
279
                # - final_dataset = clone
280
                #
281
                # which means that the CLONED dataset needs to be set in the
282
                # Experiment object (self.exp) as the "out data", but under
283
                # the name of the ORIGINAL dataset.
284
                # And also, the ORIGINAL dataset is set in the Experiment
285
                # object, but under the name of the CLONED/OBSOLETE dataset
286
                temp = obsolete
287
                self.exp.index['out_data'][name] = final_dataset
288
                self.exp.index['out_data'][s2.get_name()] = temp
289
                # One last thing to do in this case is to set the "name"
290
                # inside the Data object that final_result is set to.
291
                #
292
                # This is because, in this case, the CLONED dataset is in
293
                # final_result, and the "name" within the Data object will
294
                # be some value like "itr_0".
295
                #
296
                # However, the name within the Data object needs to be the
297
                # name of the ORIGINAL Data object in order for the creation
298
                # of the output NeXuS file to work.
299
                final_dataset._set_name(name)
300
301
    def __set_original_datasets(self):
302
        '''
303
        Utility function to make the (original) in dataset, and out dataset,
304
        easier to reference
305
        '''
306
        self.in_data = self.parameters['in_datasets']
307
        self.out_data = self.parameters['out_datasets']
308
309
    def get_original_datasets(self):
310
        '''
311
        Helper function to get the in and out datasets more easily.
312
        '''
313
        return self.in_data, self.out_data