Test Failed
Pull Request — master (#878)
by
unknown
03:29
created

PluginRunner.__run_plugin()   C

Complexity

Conditions 10

Size

Total Lines 71
Code Lines 36

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 10
eloc 36
nop 4
dl 0
loc 71
rs 5.9999
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like savu.core.plugin_runner.PluginRunner.__run_plugin() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# Copyright 2014 Diamond Light Source Ltd.
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#     http://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14
15
"""
16
.. module:: plugin_runner
17
   :platform: Unix
18
   :synopsis: Plugin list runner, which passes control to the transport layer.
19
.. moduleauthor:: Nicola Wadeson <[email protected]>
20
"""
21
22
import logging
23
24
import savu.core.utils as cu
25
import savu.plugins.utils as pu
26
from savu.data.experiment_collection import Experiment
27
from savu.plugins.stats.statistics import Statistics
28
from savu.core.iterative_plugin_runner import IteratePluginGroup
29
from savu.core.iterate_plugin_group_utils import check_if_in_iterative_loop, \
30
    check_if_end_plugin_in_iterate_group
31
32
33
class PluginRunner(object):
34
    """ Plugin list runner, which passes control to the transport layer.
35
    """
36
37
    def __init__(self, options, name='PluginRunner'):
38
        class_name = "savu.core.transports." + options["transport"] \
39
                     + "_transport"
40
        cu.add_base(self, cu.import_class(class_name))
41
        super(PluginRunner, self).__init__()
42
43
        #  ********* transport function ***********
44
        self._transport_initialise(options)
45
        self.options = options
46
        # add all relevent locations to the path
47
        pu.get_plugins_paths()
48
        self.exp = Experiment(options)
49
50
    def _run_plugin_list(self):
51
        """ Create an experiment and run the plugin list.
52
        """
53
        self.exp._setup(self)
54
        Statistics._setup_class(self.exp)
55
56
        plugin_list = self.exp.meta_data.plugin_list
57
        logging.info('Running the plugin list check')
58
        self._run_plugin_list_setup(plugin_list)
59
60
        exp_coll = self.exp._get_collection()
61
        n_plugins = plugin_list._get_n_processing_plugins()
62
63
        #  ********* transport function ***********
64
        logging.info('Running transport_pre_plugin_list_run()')
65
        self._transport_pre_plugin_list_run()
66
67
        cp = self.exp.checkpoint
68
        checkpoint_plugin = cp.get_checkpoint_plugin()
69
        for i in range(checkpoint_plugin, n_plugins):
70
            self.exp._set_experiment_for_current_plugin(i)
71
            memory_before = cu.get_memory_usage_linux()
72
73
            # now that nPlugin has been reset, need to check if we're at a
74
            # plugin index that corresponds to a plugin inside the group to
75
            # iterate over or not
76
            current_iterate_plugin_group = check_if_in_iterative_loop(self.exp)
77
78
            if current_iterate_plugin_group is None:
79
                # not in an iterative loop, run as normal
80
                plugin = self.__run_plugin(exp_coll['plugin_dict'][i])
81
                plugin_name = plugin.name
82
            else:
83
                # in an iterative loop, run differently
84
                plugin_name = \
85
                    current_iterate_plugin_group._execute_iteration_0(
86
                        self.exp, self)
87
88
            self.exp._barrier(msg='PluginRunner: plugin complete.')
89
90
            memory_after = cu.get_memory_usage_linux()
91
            logging.debug("{} memory usage before: {} MB, after: {} MB, change: {} MB".format(
92
                plugin_name, memory_before, memory_after, memory_after - memory_before))
93
94
            #  ********* transport functions ***********
95
            # end the plugin run if savu has been killed
96
            if self._transport_kill_signal():
97
                self._transport_cleanup(i + 1)
98
                break
99
            self.exp._barrier(msg='PluginRunner: No kill signal... continue.')
100
            Statistics._count()
101
            cp.output_plugin_checkpoint()
102
103
        #  ********* transport function ***********
104
        logging.info('Running transport_post_plugin_list_run')
105
        self._transport_post_plugin_list_run()
106
107
        # terminate any remaining datasets
108
        for data in list(self.exp.index['in_data'].values()):
109
            self._transport_terminate_dataset(data)
110
111
        self.__output_final_message()
112
113
        if self.exp.meta_data.get('email'):
114
            cu.send_email(self.exp.meta_data.get('email'))
115
116
        Statistics._post_chain()
117
        return self.exp
118
119
    def __output_final_message(self):
120
        kill = True if 'killsignal' in \
121
                       self.exp.meta_data.get_dictionary().keys() else False
122
        msg = "interrupted by killsignal" if kill else "Complete"
123
        stars = 40 if kill else 23
124
        cu.user_message("*" * stars)
125
        cu.user_message("* Processing " + msg + " *")
126
        cu.user_message("*" * stars)
127
128
    def __run_plugin(self, plugin_dict, clean_up_plugin=True, plugin=None):
129
        # allow plugin objects to be reused for running iteratively
130
        if plugin is None:
131
            plugin = self._transport_load_plugin(self.exp, plugin_dict)
132
133
        iterate_plugin_group = check_if_in_iterative_loop(self.exp)
134
135
        if iterate_plugin_group is not None and \
136
            iterate_plugin_group._ip_iteration == 0:
137
                iterate_plugin_group.add_plugin_to_iterate_group(plugin)
138
139
        is_end_plugin_in_iterative_loop = check_if_end_plugin_in_iterate_group(
140
            self.exp)
141
142
        if iterate_plugin_group is not None and \
143
            is_end_plugin_in_iterative_loop and \
144
            iterate_plugin_group._ip_iteration == 0:
145
146
            # check if this end plugin is ALSO the start plugin
147
            if iterate_plugin_group.start_index == \
148
                iterate_plugin_group.end_index:
149
                iterate_plugin_group.set_start_plugin(plugin)
150
151
            # set the end plugin in IteratePluginGroup
152
            iterate_plugin_group.set_end_plugin(plugin)
153
            # setup the 'iterating' key in IteratePluginGroup._ip_data_dict
154
            iterate_plugin_group.set_alternating_datasets()
155
            # setup the PluginData objects
156
            iterate_plugin_group.set_alternating_plugin_datasets()
157
            # setup the datasets for iteration 0 and 1 inside the
158
            # IteratePluginGroup object
159
            iterate_plugin_group.setup_datasets()
160
            # set the output datasets of the end plugin
161
            iterate_plugin_group._IteratePluginGroup__set_datasets()
162
163
        #  ********* transport function ***********
164
        self._transport_pre_plugin()
165
        cu.user_message("*Running the %s plugin*" % plugin.name)
166
167
        #  ******** transport 'process' function is called inside here ********
168
        plugin._run_plugin(self.exp, self)  # plugin driver
169
170
        self.exp._barrier(msg="Plugin returned from driver in Plugin Runner")
171
        cu._output_summary(self.exp.meta_data.get("mpi"), plugin)
172
173
        # if NOT in an iterative loop, clean up the PluginData associated with
174
        # the Data objects in the plugin object as normal
175
        #
176
        # if in an iterative loop, do not clean up the PluginData object
177
        # associated with the Data objects of the plugin, apart from for the
178
        # last iteration
179
        if clean_up_plugin:
180
            print(f"Cleaning up plugin {plugin.name}")
181
            plugin._clean_up()
182
        else:
183
            info_msg = f"Not cleaning up plugin {plugin.name}, as it is in a " \
184
                f"group to iterate over"
185
            print(info_msg)
186
187
        finalise = self.exp._finalise_experiment_for_current_plugin()
188
189
        #  ********* transport function ***********
190
        self._transport_post_plugin()
191
192
        for data in finalise['remove'] + finalise['replace']:
193
            #  ********* transport function ***********
194
            self._transport_terminate_dataset(data)
195
196
        self.exp._reorganise_datasets(finalise)
197
198
        return plugin
199
200
    def _run_plugin_list_setup(self, plugin_list):
201
        """ Run the plugin list through the framework without executing the
202
        main processing.
203
        """
204
        plugin_list._check_loaders()
205
        self.__check_gpu()
206
207
        n_loaders = self.exp.meta_data.plugin_list._get_n_loaders()
208
        n_plugins = plugin_list._get_n_processing_plugins()
209
        plist = plugin_list.plugin_list
210
211
        # set loaders
212
        for i in range(n_loaders):
213
            pu.plugin_loader(self.exp, plist[i])
214
            self.exp._set_initial_datasets()
215
216
        # run all plugin setup methods and store information in experiment
217
        # collection
218
        count = 0
219
220
        for plugin_dict in plist[n_loaders:n_loaders + n_plugins]:
221
            self.__plugin_setup(plugin_dict, count)
222
            count += 1
223
224
        plugin_list._add_missing_savers(self.exp)
225
226
        #  ********* transport function ***********
227
        self._transport_update_plugin_list()
228
229
        # check added savers
230
        for plugin_dict in plist[n_loaders + count:]:
231
            self.__plugin_setup(plugin_dict, count)
232
            count += 1
233
234
        self.exp._reset_datasets()
235
        self.exp._finalise_setup(plugin_list)
236
        cu.user_message("Plugin list check complete!")
237
238
    def __plugin_setup(self, plugin_dict, count):
239
        self.exp.meta_data.set("nPlugin", count)
240
        plugin = pu.plugin_loader(self.exp, plugin_dict, check=True)
241
        plugin._revert_preview(plugin.get_in_datasets())
242
        plugin_dict['cite'] = plugin.tools.get_citations()
243
        plugin._clean_up()
244
        self.exp._merge_out_data_to_in(plugin_dict)
245
246
    def __check_gpu(self):
247
        """ Check if the process list contains GPU processes and determine if
248
        GPUs exists. Add GPU processes to the processes list if required."""
249
        if not self.exp.meta_data.plugin_list._contains_gpu_processes():
250
            return
251
252
        try:
253
            import pynvml as pv
254
        except Exception:
255
            logging.debug("pyNVML module not found")
256
            raise Exception("pyNVML module not found")
257
258
        try:
259
            pv.nvmlInit()
260
            count = int(pv.nvmlDeviceGetCount())
261
            if count == 0:
262
                raise Exception("No GPUs found")
263
            logging.debug("%s GPUs have been found.", count)
264
265
            if not self.exp.meta_data.get('test_state'):
266
                for i in range(count):
267
                    handle = pv.nvmlDeviceGetHandleByIndex(i)
268
                    if pv.nvmlDeviceGetComputeRunningProcesses(handle):
269
                        raise Exception("Unfortunately, GPU %i is busy. Try \
270
                            resubmitting the job to the queue." % i)
271
        except Exception as e:
272
            raise Exception("Unable to run GPU plugins: %s", str(e))
273
        self.__set_gpu_processes(count)
274
275
    def __set_gpu_processes(self, count):
276
        processes = self.exp.meta_data.get('processes')
277
        if not [i for i in processes if 'GPU' in i]:
278
            logging.debug("GPU processes missing. GPUs found so adding them.")
279
            cpus = ['CPU' + str(i) for i in range(count)]
280
            gpus = ['GPU' + str(i) for i in range(count)]
281
            for i in range(min(count, len(processes))):
282
                processes[processes.index(cpus[i])] = gpus[i]
283
            self.exp.meta_data.set('processes', processes)
284