lorentzian()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 3
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 3
nop 4
dl 0
loc 3
rs 10
c 0
b 0
f 0
1
# Copyright 2014 Diamond Light Source Ltd.
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#     http://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14
15
"""
16
.. module:: base_fluo_fitter
17
   :platform: Unix
18
   :synopsis: a base fitting plugin
19
20
.. moduleauthor:: Aaron Parsons <[email protected]>
21
22
"""
23
24
import logging
25
from savu.plugins.plugin import Plugin
26
from savu.plugins.driver.cpu_plugin import CpuPlugin
27
import peakutils as pe
28
import numpy as np
29
import xraylib as xl
30
from flupy.algorithms.xrf_calculations.transitions_and_shells import \
31
    shells, transitions
32
from flupy.algorithms.xrf_calculations.escape import *
33
from flupy.xrf_data_handling import XRFDataset
34
from copy import deepcopy
35
36
37
class BaseFluoFitter(Plugin, CpuPlugin):
38
    def __init__(self, name="BaseFluoFitter"):
39
        super(BaseFluoFitter, self).__init__(name)
40
41
    def base_pre_process(self):
42
        in_meta_data = self.get_in_meta_data()[0]
43
        try:
44
            _foo = in_meta_data.get("PeakIndex")[0]
45
            logging.debug('Using the positions in the peak index')
46
        except KeyError:
47
            logging.debug("No Peak Index in the metadata")
48
            logging.debug("Calculating the positions from energy")
49
#             idx = self.setPositions(in_meta_data)
50
            logging.debug("The index is"+str(self.idx))
51
            in_meta_data.set('PeakIndex', self.idx)
52
            in_meta_data.set('PeakEnergy', self.axis[self.idx])
53
54
    def setup(self):
55
        # set up the output datasets that are created by the plugin
56
        logging.debug('setting up the fluorescence fitting')
57
        in_dataset, out_datasets = self.get_datasets()
58
        in_pData, out_pData = self.get_plugin_datasets()
59
        in_meta_data = in_dataset[0].meta_data
60
61
        shape = in_dataset[0].get_shape()
62
        in_pData[0].plugin_data_setup('SPECTRUM', self.get_max_frames())
63
64
        axis_labels = ['-1.PeakIndex.pixel.unit']
65
        pattern_list = ['SINOGRAM', 'PROJECTION']
66
67
        fitAreas = out_datasets[0]
68
        fitWidths = out_datasets[1]
69
        fitHeights = out_datasets[2]
70
        self.length = shape[-1]
71
        idx = self.setPositions(in_meta_data)
72
        logging.debug("in the setup the index is"+str(idx))
73
        numpeaks = len(idx)
74
        new_shape = shape[:-1] + (numpeaks,)
75
76
        channel = {'core_dims': (-1,), 'slice_dims': list(range(len(shape)-1))}
77
        fitAreas.create_dataset(patterns={in_dataset[0]: pattern_list},
78
                                axis_labels={in_dataset[0]: axis_labels},
79
                                shape=new_shape)
80
        fitAreas.add_pattern("CHANNEL", **channel)
81
        out_pData[0].plugin_data_setup('CHANNEL', self.get_max_frames())
82
83
        fitWidths.create_dataset(patterns={in_dataset[0]: pattern_list},
84
                                 axis_labels={in_dataset[0]: axis_labels},
85
                                 shape=new_shape)
86
        fitWidths.add_pattern("CHANNEL", **channel)
87
        out_pData[1].plugin_data_setup('CHANNEL', self.get_max_frames())
88
89
        fitHeights.create_dataset(patterns={in_dataset[0]: pattern_list},
90
                                  axis_labels={in_dataset[0]: axis_labels},
91
                                  shape=new_shape)
92
        fitHeights.add_pattern("CHANNEL", **channel)
93
        out_pData[2].plugin_data_setup('CHANNEL', self.get_max_frames())
94
95
        residuals = out_datasets[3]
96
        residuals.create_dataset(in_dataset[0])
97
        residuals.set_shape(shape[:-1]+(len(self.axis),))
98
        out_pData[3].plugin_data_setup('SPECTRUM', self.get_max_frames())
99
100
        for i in range(len(out_datasets)):
101
            out_datasets[i].meta_data = deepcopy(in_meta_data)
102
            mData = out_datasets[i].meta_data
103
            mData.set("PeakEnergy", self.axis[self.idx])
104
            mData.set('PeakIndex', self.idx)
105
106
    def setPositions(self, in_meta_data):
107
        paramdict = XRFDataset().paramdict
108
        paramdict["FitParams"]["pileup_cutoff_keV"] = \
109
            self.parameters["pileup_cutoff_keV"]
110
        paramdict["FitParams"]["include_pileup"] = \
111
            self.parameters["include_pileup"]
112
        paramdict["FitParams"]["include_escape"] = \
113
            self.parameters["include_escape"]
114
        paramdict["FitParams"]["fitted_energy_range_keV"] = \
115
            self.parameters["fitted_energy_range_keV"]
116
        if self.parameters['mono_energy'] is None:
117
            paramdict["Experiment"]["incident_energy_keV"] = \
118
                in_meta_data.get("mono_energy")
119
        else:
120
            paramdict["Experiment"]["incident_energy_keV"] = \
121
                self.parameters['mono_energy']
122
        paramdict["Experiment"]["elements"] = \
123
            self.parameters["elements"]
124
        engy = self.findLines(paramdict)
125
        # make it an index since this is what find peak will also give us
126
#         print 'basefluo meta is:'+str(in_meta_data.get_dictionary().keys())
127
        axis = self.axis = in_meta_data.get("energy")
128
        dq = axis[1]-axis[0]
129
        logging.debug("the peak energies are:"+str(engy))
130
        logging.debug("the offset is"+str(axis[0]))
131
        self.idx = np.round((engy-axis[0])/dq).astype(int)
132
133
        return self.idx
134
135
    def findLines(self, paramdict=XRFDataset().paramdict):
136
        """
137
        Calculates the line energies to fit
138
        """
139
        # Incident Energy  used in the experiment
140
        # Energy range to use for fitting
141
        pileup_cut_off = paramdict["FitParams"]["pileup_cutoff_keV"]
142
        include_pileup = paramdict["FitParams"]["include_pileup"]
143
        include_escape = paramdict["FitParams"]["include_escape"]
144
        fitting_range = paramdict["FitParams"]["fitted_energy_range_keV"]
145
#         x = paramdict["FitParams"]["mca_energies_used"]
146
        energy = paramdict["Experiment"]["incident_energy_keV"]
147
        detectortype = 'Vortex_SDD_Xspress'
148
        fitelements = paramdict["Experiment"]["elements"]
149
        peakpos = []
150
        escape_peaks = []
151
        for _j, el in enumerate(fitelements):
152
            z = xl.SymbolToAtomicNumber(str(el))
153
            for i, shell in enumerate(shells):
154
                if(xl.EdgeEnergy(z, shell) < energy - 0.5):
155
                    linepos = 0.0
156
                    count = 0.0
157
                    for line in transitions[i]:
158
                        en = xl.LineEnergy(z, line)
159
                        if(en > 0.0):
160
                            linepos += en
161
                            count += 1.0
162
                    if(count == 0.0):
163
                        break
164
                    linepos = linepos // count
165
                    if(linepos > fitting_range[0] and
166
                            linepos < fitting_range[1]):
167
                        peakpos.append(linepos)
168
        peakpos = np.array(peakpos)
169
        too_low = set(list(peakpos[peakpos > fitting_range[0]]))
170
        too_high = set(list(peakpos[peakpos < fitting_range[1]]))
171
        bar = list(too_low and too_high)
172
        bar = np.unique(bar)
173
        peakpos = list(bar)
174
        peaks = []
175
        peaks.extend(peakpos)
176
        if(include_escape):
177
            for i in range(len(peakpos)):
178
                escape_energy = calc_escape_energy(peakpos[i], detectortype)[0]
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable calc_escape_energy does not seem to be defined.
Loading history...
179
                if (escape_energy > fitting_range[0]):
180
                    if (escape_energy < fitting_range[1]):
181
                        escape_peaks.extend([escape_energy])
182
    #         print escape_peaks
183
            peaks.extend(escape_peaks)
184
185
        if(include_pileup):  # applies just to the fluorescence lines
186
            pileup_peaks = []
187
            peakpos1 = np.array(peakpos)
188
            peakpos_high = peakpos1[peakpos1 > pileup_cut_off]
189
            peakpos_high = list(peakpos_high)
190
            for i in range(len(peakpos_high)):
191
                foo = [peakpos_high[i] + x for x in peakpos_high[i:]]
192
                foo = np.array(foo)
193
                pileup_peaks.extend(foo)
194
            pileup_peaks = np.unique(sorted(pileup_peaks))
195
            peaks.extend(pileup_peaks)
196
        peakpos = peaks
197
        peakpos = np.array(peakpos)
198
        too_low = set(list(peakpos[peakpos > fitting_range[0]]))
199
        too_high = set(list(peakpos[peakpos < fitting_range[1] - 0.5]))
200
        bar = list(too_low and too_high)
201
        bar = np.unique(bar)
202
        peakpos = list(bar)
203
        peakpos = np.unique(peakpos)
204
#         print peakpos
205
        return peakpos
206
207 View Code Duplication
    def getAreas(self, fun, x, positions, fitmatrix):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
208
        rest = fitmatrix
209
        numargsinp = self.getFitFunctionNumArgs(str(fun.__name__))  # 2 in
210
        npts = len(fitmatrix) // numargsinp
211
        weights = rest[:npts]
212
        widths = rest[npts:2*npts]
213
        areas = []
214
        for ii in range(len(weights)):
215
            areas.append(np.sum(fun(weights[ii],
216
                                    widths[ii],
217
                                    x,
218
                                    positions[ii],
219
                                    )))
220
        return weights, widths, np.array(areas)
221
222
    def getFitFunctionNumArgs(self,key):
223
        self.lookup = {
224
                       "lorentzian": 2,
225
                       "gaussian": 2
226
                       }
227
        return self.lookup[key]
228
229
    def get_max_frames(self):
230
        return 'single'
231
232
    def nOutput_datasets(self):
233
        return 4
234
235
    def getFitFunction(self,key):
236
        self.lookup = {
237
                       "lorentzian": lorentzian,
238
                       "gaussian": gaussian
239
                       }
240
        return self.lookup[key]
241
242
    def _resid(self, p, fun, y, x, pos):
243
        r = y-self._spectrum_sum(fun, x, pos, *p)
244
245
        return r
246
247 View Code Duplication
    def dfunc(self, p, fun, y, x, pos):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
248
        if fun.__name__ == 'gaussian' or fun.__name__ == 'lorentzian': # took the lorentzian out. Weird
249
            rest = p
250
            npts = len(p) // 2
251
            a = rest[:npts]
252
            sig = rest[npts:2*npts]
253
            mu = pos
254
            if fun.__name__ == 'gaussian':
255
                da = self.spectrum_sum_dfun(fun, 1./a, x, mu, *p)
256
                dsig_mult = np.zeros((npts, len(x)))
257
                for i in range(npts):
258
                    dsig_mult[i] = ((x-mu[i])**2) / sig[i]**3
259
                dsig = self.spectrum_sum_dfun(fun, dsig_mult, x, mu, *p)
260
                op = np.concatenate([-da, -dsig])
261
            elif fun.__name__ == 'lorentzian':
262
                da = self.spectrum_sum_dfun(fun, 1./a, x, mu, *p)
263
                dsig = np.zeros((npts, len(x)))
264
                for i in range(npts):
265
                    nom = 8 * a[i] * sig[i] * (x - mu[i]) ** 2
266
                    denom = (sig[i]**2 + 4.0 * (x - mu[i])**2)**2
267
                    dsig[i] = nom / denom
268
                op = np.concatenate([-da, -dsig])
269
        else:
270
            op = None
271
        return op
0 ignored issues
show
introduced by
The variable op does not seem to be defined for all execution paths.
Loading history...
272
273
    def _spectrum_sum(self, fun, x, positions, *p):
274
        rest = np.abs(p)
275
        npts = len(p) // 2
276
        weights = rest[:npts]
277
        widths = rest[npts:2*npts]
278
        spec = np.zeros((len(x),))
279
        for ii in range(len(weights)):
280
            spec += fun(weights[ii], widths[ii], x, positions[ii])
281
        return spec
282
283 View Code Duplication
    def spectrum_sum_dfun(self, fun, multiplier, x, pos, *p):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
284
        rest = p
285
        npts = len(p) // 2
286
        weights = rest[:npts]
287
        widths = rest[npts:2*npts]
288
        positions = pos
289
    #    print(len(positions))
290
        spec = np.zeros((npts, len(x)))
291
        #print "len x is "+str(len(spec))
292
    #    print len(spec), type(spec)
293
    #    print len(positions), type(positions)
294
    #    print len(weights), type(weights)
295
        for ii in range(len(weights)):
296
            spec[ii] = multiplier[ii]*fun(weights[ii],
297
                                          widths[ii],
298
                                          x, positions[ii])
299
        return spec
300
301
def lorentzian(a, w, x, c):
302
    y = a / (1.0 + (2.0 * (c - x) / w) ** 2)
303
    return y
304
305
306
def gaussian(a, w, x, c):
307
    return pe.gaussian(x, a, c, w)
308
309
310