| Total Complexity | 42 |
| Total Lines | 308 |
| Duplicated Lines | 18.18 % |
| Changes | 0 | ||
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like savu.plugins.unregistered.fluo_fitters.base_fluo_fitter often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
| 1 | # Copyright 2014 Diamond Light Source Ltd. |
||
| 2 | # |
||
| 3 | # Licensed under the Apache License, Version 2.0 (the "License"); |
||
| 4 | # you may not use this file except in compliance with the License. |
||
| 5 | # You may obtain a copy of the License at |
||
| 6 | # |
||
| 7 | # http://www.apache.org/licenses/LICENSE-2.0 |
||
| 8 | # |
||
| 9 | # Unless required by applicable law or agreed to in writing, software |
||
| 10 | # distributed under the License is distributed on an "AS IS" BASIS, |
||
| 11 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||
| 12 | # See the License for the specific language governing permissions and |
||
| 13 | # limitations under the License. |
||
| 14 | |||
| 15 | """ |
||
| 16 | .. module:: base_fluo_fitter |
||
| 17 | :platform: Unix |
||
| 18 | :synopsis: a base fitting plugin |
||
| 19 | |||
| 20 | .. moduleauthor:: Aaron Parsons <[email protected]> |
||
| 21 | |||
| 22 | """ |
||
| 23 | |||
| 24 | import logging |
||
| 25 | from savu.plugins.plugin import Plugin |
||
| 26 | from savu.plugins.driver.cpu_plugin import CpuPlugin |
||
| 27 | import peakutils as pe |
||
| 28 | import numpy as np |
||
| 29 | import xraylib as xl |
||
| 30 | from flupy.algorithms.xrf_calculations.transitions_and_shells import \ |
||
| 31 | shells, transitions |
||
| 32 | from flupy.algorithms.xrf_calculations.escape import * |
||
| 33 | from flupy.xrf_data_handling import XRFDataset |
||
| 34 | from copy import deepcopy |
||
| 35 | |||
| 36 | |||
| 37 | class BaseFluoFitter(Plugin, CpuPlugin): |
||
| 38 | def __init__(self, name="BaseFluoFitter"): |
||
| 39 | super(BaseFluoFitter, self).__init__(name) |
||
| 40 | |||
| 41 | def base_pre_process(self): |
||
| 42 | in_meta_data = self.get_in_meta_data()[0] |
||
| 43 | try: |
||
| 44 | _foo = in_meta_data.get("PeakIndex")[0] |
||
| 45 | logging.debug('Using the positions in the peak index') |
||
| 46 | except KeyError: |
||
| 47 | logging.debug("No Peak Index in the metadata") |
||
| 48 | logging.debug("Calculating the positions from energy") |
||
| 49 | # idx = self.setPositions(in_meta_data) |
||
| 50 | logging.debug("The index is"+str(self.idx)) |
||
| 51 | in_meta_data.set('PeakIndex', self.idx) |
||
| 52 | in_meta_data.set('PeakEnergy', self.axis[self.idx]) |
||
| 53 | |||
| 54 | def setup(self): |
||
| 55 | # set up the output datasets that are created by the plugin |
||
| 56 | logging.debug('setting up the fluorescence fitting') |
||
| 57 | in_dataset, out_datasets = self.get_datasets() |
||
| 58 | in_pData, out_pData = self.get_plugin_datasets() |
||
| 59 | in_meta_data = in_dataset[0].meta_data |
||
| 60 | |||
| 61 | shape = in_dataset[0].get_shape() |
||
| 62 | in_pData[0].plugin_data_setup('SPECTRUM', self.get_max_frames()) |
||
| 63 | |||
| 64 | axis_labels = ['-1.PeakIndex.pixel.unit'] |
||
| 65 | pattern_list = ['SINOGRAM', 'PROJECTION'] |
||
| 66 | |||
| 67 | fitAreas = out_datasets[0] |
||
| 68 | fitWidths = out_datasets[1] |
||
| 69 | fitHeights = out_datasets[2] |
||
| 70 | self.length = shape[-1] |
||
| 71 | idx = self.setPositions(in_meta_data) |
||
| 72 | logging.debug("in the setup the index is"+str(idx)) |
||
| 73 | numpeaks = len(idx) |
||
| 74 | new_shape = shape[:-1] + (numpeaks,) |
||
| 75 | |||
| 76 | channel = {'core_dims': (-1,), 'slice_dims': list(range(len(shape)-1))} |
||
| 77 | fitAreas.create_dataset(patterns={in_dataset[0]: pattern_list}, |
||
| 78 | axis_labels={in_dataset[0]: axis_labels}, |
||
| 79 | shape=new_shape) |
||
| 80 | fitAreas.add_pattern("CHANNEL", **channel) |
||
| 81 | out_pData[0].plugin_data_setup('CHANNEL', self.get_max_frames()) |
||
| 82 | |||
| 83 | fitWidths.create_dataset(patterns={in_dataset[0]: pattern_list}, |
||
| 84 | axis_labels={in_dataset[0]: axis_labels}, |
||
| 85 | shape=new_shape) |
||
| 86 | fitWidths.add_pattern("CHANNEL", **channel) |
||
| 87 | out_pData[1].plugin_data_setup('CHANNEL', self.get_max_frames()) |
||
| 88 | |||
| 89 | fitHeights.create_dataset(patterns={in_dataset[0]: pattern_list}, |
||
| 90 | axis_labels={in_dataset[0]: axis_labels}, |
||
| 91 | shape=new_shape) |
||
| 92 | fitHeights.add_pattern("CHANNEL", **channel) |
||
| 93 | out_pData[2].plugin_data_setup('CHANNEL', self.get_max_frames()) |
||
| 94 | |||
| 95 | residuals = out_datasets[3] |
||
| 96 | residuals.create_dataset(in_dataset[0]) |
||
| 97 | residuals.set_shape(shape[:-1]+(len(self.axis),)) |
||
| 98 | out_pData[3].plugin_data_setup('SPECTRUM', self.get_max_frames()) |
||
| 99 | |||
| 100 | for i in range(len(out_datasets)): |
||
| 101 | out_datasets[i].meta_data = deepcopy(in_meta_data) |
||
| 102 | mData = out_datasets[i].meta_data |
||
| 103 | mData.set("PeakEnergy", self.axis[self.idx]) |
||
| 104 | mData.set('PeakIndex', self.idx) |
||
| 105 | |||
| 106 | def setPositions(self, in_meta_data): |
||
| 107 | paramdict = XRFDataset().paramdict |
||
| 108 | paramdict["FitParams"]["pileup_cutoff_keV"] = \ |
||
| 109 | self.parameters["pileup_cutoff_keV"] |
||
| 110 | paramdict["FitParams"]["include_pileup"] = \ |
||
| 111 | self.parameters["include_pileup"] |
||
| 112 | paramdict["FitParams"]["include_escape"] = \ |
||
| 113 | self.parameters["include_escape"] |
||
| 114 | paramdict["FitParams"]["fitted_energy_range_keV"] = \ |
||
| 115 | self.parameters["fitted_energy_range_keV"] |
||
| 116 | if self.parameters['mono_energy'] is None: |
||
| 117 | paramdict["Experiment"]["incident_energy_keV"] = \ |
||
| 118 | in_meta_data.get("mono_energy") |
||
| 119 | else: |
||
| 120 | paramdict["Experiment"]["incident_energy_keV"] = \ |
||
| 121 | self.parameters['mono_energy'] |
||
| 122 | paramdict["Experiment"]["elements"] = \ |
||
| 123 | self.parameters["elements"] |
||
| 124 | engy = self.findLines(paramdict) |
||
| 125 | # make it an index since this is what find peak will also give us |
||
| 126 | # print 'basefluo meta is:'+str(in_meta_data.get_dictionary().keys()) |
||
| 127 | axis = self.axis = in_meta_data.get("energy") |
||
| 128 | dq = axis[1]-axis[0] |
||
| 129 | logging.debug("the peak energies are:"+str(engy)) |
||
| 130 | logging.debug("the offset is"+str(axis[0])) |
||
| 131 | self.idx = np.round((engy-axis[0])/dq).astype(int) |
||
| 132 | |||
| 133 | return self.idx |
||
| 134 | |||
| 135 | def findLines(self, paramdict=XRFDataset().paramdict): |
||
| 136 | """ |
||
| 137 | Calculates the line energies to fit |
||
| 138 | """ |
||
| 139 | # Incident Energy used in the experiment |
||
| 140 | # Energy range to use for fitting |
||
| 141 | pileup_cut_off = paramdict["FitParams"]["pileup_cutoff_keV"] |
||
| 142 | include_pileup = paramdict["FitParams"]["include_pileup"] |
||
| 143 | include_escape = paramdict["FitParams"]["include_escape"] |
||
| 144 | fitting_range = paramdict["FitParams"]["fitted_energy_range_keV"] |
||
| 145 | # x = paramdict["FitParams"]["mca_energies_used"] |
||
| 146 | energy = paramdict["Experiment"]["incident_energy_keV"] |
||
| 147 | detectortype = 'Vortex_SDD_Xspress' |
||
| 148 | fitelements = paramdict["Experiment"]["elements"] |
||
| 149 | peakpos = [] |
||
| 150 | escape_peaks = [] |
||
| 151 | for _j, el in enumerate(fitelements): |
||
| 152 | z = xl.SymbolToAtomicNumber(str(el)) |
||
| 153 | for i, shell in enumerate(shells): |
||
| 154 | if(xl.EdgeEnergy(z, shell) < energy - 0.5): |
||
| 155 | linepos = 0.0 |
||
| 156 | count = 0.0 |
||
| 157 | for line in transitions[i]: |
||
| 158 | en = xl.LineEnergy(z, line) |
||
| 159 | if(en > 0.0): |
||
| 160 | linepos += en |
||
| 161 | count += 1.0 |
||
| 162 | if(count == 0.0): |
||
| 163 | break |
||
| 164 | linepos = linepos // count |
||
| 165 | if(linepos > fitting_range[0] and |
||
| 166 | linepos < fitting_range[1]): |
||
| 167 | peakpos.append(linepos) |
||
| 168 | peakpos = np.array(peakpos) |
||
| 169 | too_low = set(list(peakpos[peakpos > fitting_range[0]])) |
||
| 170 | too_high = set(list(peakpos[peakpos < fitting_range[1]])) |
||
| 171 | bar = list(too_low and too_high) |
||
| 172 | bar = np.unique(bar) |
||
| 173 | peakpos = list(bar) |
||
| 174 | peaks = [] |
||
| 175 | peaks.extend(peakpos) |
||
| 176 | if(include_escape): |
||
| 177 | for i in range(len(peakpos)): |
||
| 178 | escape_energy = calc_escape_energy(peakpos[i], detectortype)[0] |
||
|
|
|||
| 179 | if (escape_energy > fitting_range[0]): |
||
| 180 | if (escape_energy < fitting_range[1]): |
||
| 181 | escape_peaks.extend([escape_energy]) |
||
| 182 | # print escape_peaks |
||
| 183 | peaks.extend(escape_peaks) |
||
| 184 | |||
| 185 | if(include_pileup): # applies just to the fluorescence lines |
||
| 186 | pileup_peaks = [] |
||
| 187 | peakpos1 = np.array(peakpos) |
||
| 188 | peakpos_high = peakpos1[peakpos1 > pileup_cut_off] |
||
| 189 | peakpos_high = list(peakpos_high) |
||
| 190 | for i in range(len(peakpos_high)): |
||
| 191 | foo = [peakpos_high[i] + x for x in peakpos_high[i:]] |
||
| 192 | foo = np.array(foo) |
||
| 193 | pileup_peaks.extend(foo) |
||
| 194 | pileup_peaks = np.unique(sorted(pileup_peaks)) |
||
| 195 | peaks.extend(pileup_peaks) |
||
| 196 | peakpos = peaks |
||
| 197 | peakpos = np.array(peakpos) |
||
| 198 | too_low = set(list(peakpos[peakpos > fitting_range[0]])) |
||
| 199 | too_high = set(list(peakpos[peakpos < fitting_range[1] - 0.5])) |
||
| 200 | bar = list(too_low and too_high) |
||
| 201 | bar = np.unique(bar) |
||
| 202 | peakpos = list(bar) |
||
| 203 | peakpos = np.unique(peakpos) |
||
| 204 | # print peakpos |
||
| 205 | return peakpos |
||
| 206 | |||
| 207 | View Code Duplication | def getAreas(self, fun, x, positions, fitmatrix): |
|
| 208 | rest = fitmatrix |
||
| 209 | numargsinp = self.getFitFunctionNumArgs(str(fun.__name__)) # 2 in |
||
| 210 | npts = len(fitmatrix) // numargsinp |
||
| 211 | weights = rest[:npts] |
||
| 212 | widths = rest[npts:2*npts] |
||
| 213 | areas = [] |
||
| 214 | for ii in range(len(weights)): |
||
| 215 | areas.append(np.sum(fun(weights[ii], |
||
| 216 | widths[ii], |
||
| 217 | x, |
||
| 218 | positions[ii], |
||
| 219 | ))) |
||
| 220 | return weights, widths, np.array(areas) |
||
| 221 | |||
| 222 | def getFitFunctionNumArgs(self,key): |
||
| 223 | self.lookup = { |
||
| 224 | "lorentzian": 2, |
||
| 225 | "gaussian": 2 |
||
| 226 | } |
||
| 227 | return self.lookup[key] |
||
| 228 | |||
| 229 | def get_max_frames(self): |
||
| 230 | return 'single' |
||
| 231 | |||
| 232 | def nOutput_datasets(self): |
||
| 233 | return 4 |
||
| 234 | |||
| 235 | def getFitFunction(self,key): |
||
| 236 | self.lookup = { |
||
| 237 | "lorentzian": lorentzian, |
||
| 238 | "gaussian": gaussian |
||
| 239 | } |
||
| 240 | return self.lookup[key] |
||
| 241 | |||
| 242 | def _resid(self, p, fun, y, x, pos): |
||
| 243 | r = y-self._spectrum_sum(fun, x, pos, *p) |
||
| 244 | |||
| 245 | return r |
||
| 246 | |||
| 247 | View Code Duplication | def dfunc(self, p, fun, y, x, pos): |
|
| 248 | if fun.__name__ == 'gaussian' or fun.__name__ == 'lorentzian': # took the lorentzian out. Weird |
||
| 249 | rest = p |
||
| 250 | npts = len(p) // 2 |
||
| 251 | a = rest[:npts] |
||
| 252 | sig = rest[npts:2*npts] |
||
| 253 | mu = pos |
||
| 254 | if fun.__name__ == 'gaussian': |
||
| 255 | da = self.spectrum_sum_dfun(fun, 1./a, x, mu, *p) |
||
| 256 | dsig_mult = np.zeros((npts, len(x))) |
||
| 257 | for i in range(npts): |
||
| 258 | dsig_mult[i] = ((x-mu[i])**2) / sig[i]**3 |
||
| 259 | dsig = self.spectrum_sum_dfun(fun, dsig_mult, x, mu, *p) |
||
| 260 | op = np.concatenate([-da, -dsig]) |
||
| 261 | elif fun.__name__ == 'lorentzian': |
||
| 262 | da = self.spectrum_sum_dfun(fun, 1./a, x, mu, *p) |
||
| 263 | dsig = np.zeros((npts, len(x))) |
||
| 264 | for i in range(npts): |
||
| 265 | nom = 8 * a[i] * sig[i] * (x - mu[i]) ** 2 |
||
| 266 | denom = (sig[i]**2 + 4.0 * (x - mu[i])**2)**2 |
||
| 267 | dsig[i] = nom / denom |
||
| 268 | op = np.concatenate([-da, -dsig]) |
||
| 269 | else: |
||
| 270 | op = None |
||
| 271 | return op |
||
| 272 | |||
| 273 | def _spectrum_sum(self, fun, x, positions, *p): |
||
| 274 | rest = np.abs(p) |
||
| 275 | npts = len(p) // 2 |
||
| 276 | weights = rest[:npts] |
||
| 277 | widths = rest[npts:2*npts] |
||
| 278 | spec = np.zeros((len(x),)) |
||
| 279 | for ii in range(len(weights)): |
||
| 280 | spec += fun(weights[ii], widths[ii], x, positions[ii]) |
||
| 281 | return spec |
||
| 282 | |||
| 283 | View Code Duplication | def spectrum_sum_dfun(self, fun, multiplier, x, pos, *p): |
|
| 284 | rest = p |
||
| 285 | npts = len(p) // 2 |
||
| 286 | weights = rest[:npts] |
||
| 287 | widths = rest[npts:2*npts] |
||
| 288 | positions = pos |
||
| 289 | # print(len(positions)) |
||
| 290 | spec = np.zeros((npts, len(x))) |
||
| 291 | #print "len x is "+str(len(spec)) |
||
| 292 | # print len(spec), type(spec) |
||
| 293 | # print len(positions), type(positions) |
||
| 294 | # print len(weights), type(weights) |
||
| 295 | for ii in range(len(weights)): |
||
| 296 | spec[ii] = multiplier[ii]*fun(weights[ii], |
||
| 297 | widths[ii], |
||
| 298 | x, positions[ii]) |
||
| 299 | return spec |
||
| 300 | |||
| 301 | def lorentzian(a, w, x, c): |
||
| 302 | y = a / (1.0 + (2.0 * (c - x) / w) ** 2) |
||
| 303 | return y |
||
| 304 | |||
| 305 | |||
| 306 | def gaussian(a, w, x, c): |
||
| 307 | return pe.gaussian(x, a, c, w) |
||
| 308 | |||
| 310 |