Test Failed
Push — master ( 9ba79c...17f3e3 )
by Yousef
01:54 queued 19s
created

PluginRunner.__run_plugin()   C

Complexity

Conditions 10

Size

Total Lines 71
Code Lines 36

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 10
eloc 36
nop 4
dl 0
loc 71
rs 5.9999
c 0
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like savu.core.plugin_runner.PluginRunner.__run_plugin() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# Copyright 2014 Diamond Light Source Ltd.
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#     http://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14
15
"""
16
.. module:: plugin_runner
17
   :platform: Unix
18
   :synopsis: Plugin list runner, which passes control to the transport layer.
19
.. moduleauthor:: Nicola Wadeson <[email protected]>
20
"""
21
22
import logging
23
24
import savu.core.utils as cu
25
import savu.plugins.utils as pu
26
from savu.data.experiment_collection import Experiment
27
from savu.core.iterative_plugin_runner import IteratePluginGroup
28
from savu.core.iterate_plugin_group_utils import check_if_in_iterative_loop, \
29
    check_if_end_plugin_in_iterate_group
30
31
32
class PluginRunner(object):
33
    """ Plugin list runner, which passes control to the transport layer.
34
    """
35
36
    def __init__(self, options, name='PluginRunner'):
37
        class_name = "savu.core.transports." + options["transport"] \
38
                     + "_transport"
39
        cu.add_base(self, cu.import_class(class_name))
40
        super(PluginRunner, self).__init__()
41
42
        #  ********* transport function ***********
43
        self._transport_initialise(options)
44
        self.options = options
45
        # add all relevent locations to the path
46
        pu.get_plugins_paths()
47
        self.exp = Experiment(options)
48
49
    def _run_plugin_list(self):
50
        """ Create an experiment and run the plugin list.
51
        """
52
        self.exp._setup(self)
53
54
        plugin_list = self.exp.meta_data.plugin_list
55
        logging.info('Running the plugin list check')
56
        self._run_plugin_list_setup(plugin_list)
57
58
        exp_coll = self.exp._get_collection()
59
        n_plugins = plugin_list._get_n_processing_plugins()
60
61
        #  ********* transport function ***********
62
        logging.info('Running transport_pre_plugin_list_run()')
63
        self._transport_pre_plugin_list_run()
64
65
        cp = self.exp.checkpoint
66
        checkpoint_plugin = cp.get_checkpoint_plugin()
67
        for i in range(checkpoint_plugin, n_plugins):
68
            self.exp._set_experiment_for_current_plugin(i)
69
            memory_before = cu.get_memory_usage_linux()
70
71
            # now that nPlugin has been reset, need to check if we're at a
72
            # plugin index that corresponds to a plugin inside the group to
73
            # iterate over or not
74
            current_iterate_plugin_group = check_if_in_iterative_loop(self.exp)
75
76
            if current_iterate_plugin_group is None:
77
                # not in an iterative loop, run as normal
78
                plugin = self.__run_plugin(exp_coll['plugin_dict'][i])
79
                plugin_name = plugin.name
80
            else:
81
                # in an iterative loop, run differently
82
                plugin_name = \
83
                    current_iterate_plugin_group._execute_iteration_0(
84
                        self.exp, self)
85
86
            self.exp._barrier(msg='PluginRunner: plugin complete.')
87
88
            memory_after = cu.get_memory_usage_linux()
89
            logging.debug("{} memory usage before: {} MB, after: {} MB, change: {} MB".format(
90
                plugin_name, memory_before, memory_after, memory_after - memory_before))
91
92
            #  ********* transport functions ***********
93
            # end the plugin run if savu has been killed
94
            if self._transport_kill_signal():
95
                self._transport_cleanup(i + 1)
96
                break
97
            self.exp._barrier(msg='PluginRunner: No kill signal... continue.')
98
            cp.output_plugin_checkpoint()
99
100
        #  ********* transport function ***********
101
        logging.info('Running transport_post_plugin_list_run')
102
        self._transport_post_plugin_list_run()
103
104
        # terminate any remaining datasets
105
        for data in list(self.exp.index['in_data'].values()):
106
            self._transport_terminate_dataset(data)
107
108
        self.__output_final_message()
109
110
        if self.exp.meta_data.get('email'):
111
            cu.send_email(self.exp.meta_data.get('email'))
112
113
        return self.exp
114
115
    def __output_final_message(self):
116
        kill = True if 'killsignal' in \
117
                       self.exp.meta_data.get_dictionary().keys() else False
118
        msg = "interrupted by killsignal" if kill else "Complete"
119
        stars = 40 if kill else 23
120
        cu.user_message("*" * stars)
121
        cu.user_message("* Processing " + msg + " *")
122
        cu.user_message("*" * stars)
123
124
    def __run_plugin(self, plugin_dict, clean_up_plugin=True, plugin=None):
125
        # allow plugin objects to be reused for running iteratively
126
        if plugin is None:
127
            plugin = self._transport_load_plugin(self.exp, plugin_dict)
128
129
        iterate_plugin_group = check_if_in_iterative_loop(self.exp)
130
131
        if iterate_plugin_group is not None and \
132
            iterate_plugin_group._ip_iteration == 0:
133
                iterate_plugin_group.add_plugin_to_iterate_group(plugin)
134
135
        is_end_plugin_in_iterative_loop = check_if_end_plugin_in_iterate_group(
136
            self.exp)
137
138
        if iterate_plugin_group is not None and \
139
            is_end_plugin_in_iterative_loop and \
140
            iterate_plugin_group._ip_iteration == 0:
141
142
            # check if this end plugin is ALSO the start plugin
143
            if iterate_plugin_group.start_index == \
144
                iterate_plugin_group.end_index:
145
                iterate_plugin_group.set_start_plugin(plugin)
146
147
            # set the end plugin in IteratePluginGroup
148
            iterate_plugin_group.set_end_plugin(plugin)
149
            # setup the 'iterating' key in IteratePluginGroup._ip_data_dict
150
            iterate_plugin_group.set_alternating_datasets()
151
            # setup the PluginData objects
152
            iterate_plugin_group.set_alternating_plugin_datasets()
153
            # setup the datasets for iteration 0 and 1 inside the
154
            # IteratePluginGroup object
155
            iterate_plugin_group.setup_datasets()
156
            # set the output datasets of the end plugin
157
            iterate_plugin_group._IteratePluginGroup__set_datasets()
158
159
        #  ********* transport function ***********
160
        self._transport_pre_plugin()
161
        cu.user_message("*Running the %s plugin*" % plugin.name)
162
163
        #  ******** transport 'process' function is called inside here ********
164
        plugin._run_plugin(self.exp, self)  # plugin driver
165
166
        self.exp._barrier(msg="Plugin returned from driver in Plugin Runner")
167
        cu._output_summary(self.exp.meta_data.get("mpi"), plugin)
168
169
        # if NOT in an iterative loop, clean up the PluginData associated with
170
        # the Data objects in the plugin object as normal
171
        #
172
        # if in an iterative loop, do not clean up the PluginData object
173
        # associated with the Data objects of the plugin, apart from for the
174
        # last iteration
175
        if clean_up_plugin:
176
            logging.debug(f"Cleaning up plugin {plugin.name}")
177
            plugin._clean_up()
178
        else:
179
            info_msg = f"Not cleaning up plugin {plugin.name}, as it is in a " \
180
                f"group to iterate over"
181
            logging.debug(info_msg)
182
183
        finalise = self.exp._finalise_experiment_for_current_plugin()
184
185
        #  ********* transport function ***********
186
        self._transport_post_plugin()
187
188
        for data in finalise['remove'] + finalise['replace']:
189
            #  ********* transport function ***********
190
            self._transport_terminate_dataset(data)
191
192
        self.exp._reorganise_datasets(finalise)
193
194
        return plugin
195
196
    def _run_plugin_list_setup(self, plugin_list):
197
        """ Run the plugin list through the framework without executing the
198
        main processing.
199
        """
200
        plugin_list._check_loaders()
201
        self.__check_gpu()
202
203
        n_loaders = self.exp.meta_data.plugin_list._get_n_loaders()
204
        n_plugins = plugin_list._get_n_processing_plugins()
205
        plist = plugin_list.plugin_list
206
207
        # set loaders
208
        for i in range(n_loaders):
209
            pu.plugin_loader(self.exp, plist[i])
210
            self.exp._set_initial_datasets()
211
212
        # run all plugin setup methods and store information in experiment
213
        # collection
214
        count = 0
215
216
        for plugin_dict in plist[n_loaders:n_loaders + n_plugins]:
217
            self.__plugin_setup(plugin_dict, count)
218
            count += 1
219
220
        plugin_list._add_missing_savers(self.exp)
221
222
        #  ********* transport function ***********
223
        self._transport_update_plugin_list()
224
225
        # check added savers
226
        for plugin_dict in plist[n_loaders + count:]:
227
            self.__plugin_setup(plugin_dict, count)
228
            count += 1
229
230
        self.exp._reset_datasets()
231
        self.exp._finalise_setup(plugin_list)
232
        cu.user_message("Plugin list check complete!")
233
234
    def __plugin_setup(self, plugin_dict, count):
235
        self.exp.meta_data.set("nPlugin", count)
236
        plugin = pu.plugin_loader(self.exp, plugin_dict, check=True)
237
        plugin._revert_preview(plugin.get_in_datasets())
238
        plugin_dict['cite'] = plugin.tools.get_citations()
239
        plugin._clean_up()
240
        self.exp._merge_out_data_to_in(plugin_dict)
241
242
    def __check_gpu(self):
243
        """ Check if the process list contains GPU processes and determine if
244
        GPUs exists. Add GPU processes to the processes list if required."""
245
        if not self.exp.meta_data.plugin_list._contains_gpu_processes():
246
            return
247
248
        try:
249
            import pynvml as pv
250
        except Exception:
251
            logging.debug("pyNVML module not found")
252
            raise Exception("pyNVML module not found")
253
254
        try:
255
            pv.nvmlInit()
256
            count = int(pv.nvmlDeviceGetCount())
257
            if count == 0:
258
                raise Exception("No GPUs found")
259
            logging.debug("%s GPUs have been found.", count)
260
261
            if not self.exp.meta_data.get('test_state'):
262
                for i in range(count):
263
                    handle = pv.nvmlDeviceGetHandleByIndex(i)
264
                    if pv.nvmlDeviceGetComputeRunningProcesses(handle):
265
                        raise Exception("Unfortunately, GPU %i is busy. Try \
266
                            resubmitting the job to the queue." % i)
267
        except Exception as e:
268
            raise Exception("Unable to run GPU plugins: %s", str(e))
269
        self.__set_gpu_processes(count)
270
271
    def __set_gpu_processes(self, count):
272
        processes = self.exp.meta_data.get('processes')
273
        if not [i for i in processes if 'GPU' in i]:
274
            logging.debug("GPU processes missing. GPUs found so adding them.")
275
            cpus = ['CPU' + str(i) for i in range(count)]
276
            gpus = ['GPU' + str(i) for i in range(count)]
277
            for i in range(min(count, len(processes))):
278
                processes[processes.index(cpus[i])] = gpus[i]
279
            self.exp.meta_data.set('processes', processes)
280