patm.modeling.regularization.trajectory   F
last analyzed

Complexity

Total Complexity 80

Size/Duplication

Total Lines 321
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 221
dl 0
loc 321
rs 2
c 0
b 0
f 0
wmc 80

43 Methods

Rating   Name   Duplication   Size   Complexity  
A TrajectoryBuilder._interpolate() 0 6 1
A TrajectoryBuilder.steady_new() 0 4 1
A TrajectoryBuilder.create_tau_trajectory() 0 3 1
A IterationChunks.__init__() 0 10 5
A IterSingle.right() 0 3 1
A IterationChunks.__getitem__() 0 2 1
A IterSingle.span() 0 3 1
A ParameterTrajectory.__str__() 0 2 1
A IterationChunks.__str__() 0 2 1
C IterationChunks._overlap() 0 19 9
A IterChunk.__eq__() 0 4 2
A TrajectoryBuilder.steady_prev() 0 4 1
A IterationChunks._check_end() 0 3 2
A IterationChunks._cand_left_smaller_than_ref_left() 0 4 2
A TrajectoryBuilder.interpolate_to() 0 13 2
A TrajectoryBuilder.__init__() 0 4 1
A IterationChunks.__len__() 0 2 1
A IterationChunks.common_chunks() 0 7 2
A IterSingle.__init__() 0 2 1
A IterSingle.left() 0 3 1
A TrajectoryBuilder.name() 0 3 1
A IterationChunks.__eq__() 0 7 4
A ParameterTrajectory.__getattr__() 0 5 3
A IterChunk.span() 0 3 1
A IterChunk.__ne__() 0 2 1
A IterationChunks._cand_left_bigger_than_ref_left() 0 4 2
A ParameterTrajectory.__len__() 0 2 1
A IterationChunks.to_training_chunks() 0 15 3
A ParameterTrajectory.__getitem__() 0 2 1
A IterationChunks.__iter__() 0 3 2
A IterDuo.__str__() 0 2 1
A ParameterTrajectory.__iter__() 0 3 2
A IterChunk.right() 0 3 1
A TrajectoryBuilder.deactivate() 0 4 1
A TrajectoryBuilder.create() 0 2 1
A IterationChunks._insert_iterations() 0 6 2
A IterChunk.__init__() 0 2 1
A TrajectoryBuilder.begin_trajectory() 0 4 1
A IterationChunks.__ne__() 0 2 1
A IterChunk.__str__() 0 2 1
A IterChunk.left() 0 3 1
A IterationChunks._cand_left_equal_to_ref_left() 0 3 1
A IterDuo.__init__() 0 5 1

3 Functions

Rating   Name   Duplication   Size   Complexity  
A _group_iterations() 0 18 3
A get_fit_iteration_chunks() 0 10 3
A _steady_iteration_ranges() 0 19 3

How to fix   Complexity   

Complexity

Complex classes like patm.modeling.regularization.trajectory often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
import numpy as np
2
from functools import reduce
3
import attr
4
5
6
def _group_iterations(self):
7
    """
8
    This [0, 0, 0, 0, 1, 1, 0.8, 0.6, 0.4, 0.2, 0.2, 0.1] leads to this [4, 2, 1, 1, 1, 2, 1].\n
9
    :return: steady_chunks of fit calls that have to be made to satisfy the dynamic change of the parameter value
10
    :rtype: list
11
    """
12
    iters = []
13
    pv = self._values[0]
14
    to_put = 1
15
    for cv in self._values[1:]:
16
        if cv == pv:
17
            to_put += 1
18
        else:
19
            iters.append(to_put)
20
            to_put = 1
21
        pv = cv
22
    iters.append(to_put)
23
    return iters
24
25
26
def _steady_iteration_ranges(iterations_groups):
27
    """
28
    This [5, 2, 1, 1, 1, 1] leads to this [[1,5], [6,7]]\n
29
    This [5, 2, 1, 1, 4, 1] leads to this [[1,5], [6,7], [10,13]].\n
30
    :param list iterations_groups:
31
    :return:
32
    :rtype: list indeces start from 1 (not 0) !
33
    """
34
    res = []
35
    accumulated_iters = 1
36
    for iter_chunk in iterations_groups:
37
        left_iter_count = accumulated_iters
38
        if iter_chunk > 1:
39
            right_iter_count = left_iter_count + iter_chunk - 1
40
            res.append([left_iter_count, right_iter_count])
41
            accumulated_iters += iter_chunk
42
        else:
43
            accumulated_iters += 1
44
    return res
45
46
47
@attr.s
48
class ParameterTrajectory(object):
49
    """
50
    This class encapsulates a parameter's value trajectory. This is basically the value the parameter shall have in
51
    every consecutive train iteration (pass through the whole collection.\n
52
    For example we may want to deactivate a regularizer for the first 5 iterations and then activating it while downgrading
53
     its coefficient from 1 to 0.2 over another 4 iterations. Then we need the tau value to follow trajectory:\n
54
      [0, 0, 0, 0, 0, 1, 0.8, 0.6, 0.4, 0.2]. \nThis class encapsulates this bevaviour.
55
    """
56
    _param = attr.ib(init=True, converter=str, repr=True)
57
    _values = attr.ib(init=True, converter=list, repr=True)
58
    group_iterations = attr.ib(init=True, default=attr.Factory(lambda self: _group_iterations(self), takes_self=True), repr=True)
59
    steady_chunks = attr.ib(init=False, default=attr.Factory(lambda self: IterationChunks([IterDuo(x) for x in _steady_iteration_ranges(self.group_iterations)]), takes_self=True))
60
61
    def __getattr__(self, item):
62
        if item == self._param:
63
            return self._values
64
        elif item == 'last_' + self._param:
65
            return self._values[-1]
66
67
    def __len__(self):
68
        return len(self._values)
69
70
    def __str__(self):
71
        return str(self._values)
72
73
    def __getitem__(self, item):
74
        return self._values[item]
75
76
    def __iter__(self):
77
        for v in self._values:
78
            yield v
79
80
81
class IterationChunks(object):
82
    def __init__(self, chunks_ref_list):
83
        if chunks_ref_list and (type(chunks_ref_list[0]) == list or chunks_ref_list[0] == 1):
84
            self.chunks = [IterSingle() if x == 1 else IterDuo(x) for x in chunks_ref_list]
85
        else:
86
            self.chunks = chunks_ref_list
87
        self._res = []
88
        self._ref = []
89
        self._toput_left = 0
90
        self._cond = None
91
        self._done = False
92
93
    def to_training_chunks(self, collection_passes):
94
        if not self.chunks:
95
            return IterationChunks([IterSingle() for _ in range(collection_passes)])
96
        covered = 0
97
        res = []
98
        for ind, ch in enumerate(self.chunks):
99
            to_add = [IterSingle() for _ in range(ch.left-covered-1)]
100
            res.extend(to_add)
101
            res.append(ch)
102
            covered = ch.right
103
        to_add = [IterSingle() for _ in range(collection_passes-covered)]
104
        res.extend(to_add)
105
        covered += len(to_add)
106
        assert covered == collection_passes
107
        return IterationChunks(res)
108
109
    def __str__(self):
110
        return '[{}]'.format(', '.join((str(_) for _ in self.chunks)))
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable _ does not seem to be defined.
Loading history...
111
        # return '[{}]'.format(', '.join(map(lambda x: '[{},{}]'.format(x.left, x.right), self.chunks)))
112
113
    def __len__(self):
114
        return len(self.chunks)
115
116
    def __getitem__(self, item):
117
        return self.chunks[item]
118
119
    def __iter__(self):
120
        for v in self.chunks:
121
            yield v
122
123
    def __eq__(self, other):
124
        if len(other) != len(self.chunks):
125
            return False
126
        for ext_el, self_el in zip(other, self.chunks):
127
            if ext_el != self_el:
128
                return False
129
        return True
130
131
    def __ne__(self, other):
132
        return not self.__eq__(other)
133
134
    def _overlap(self, duo, duo_list):
135
        self._res = []
136
        self._ref = duo
137
        self._done = False
138
        ind = 0
139
        ll = [_f for _f in [x if x.right > self._ref.left else None for x in duo_list] if _f]
140
        while not self._done and ind < len(ll):
141
            cand = ll[ind]
142
            if self._cand_left_smaller_than_ref_left(cand) and self._cond(cand) or self._cand_left_equal_to_ref_left(cand):
143
                self._insert_iterations(cand)
144
            elif self._cand_left_bigger_than_ref_left(cand):
145
                if self._cond(cand):
146
                    self._insert_iterations(cand)
147
                else:
148
                    self._done = True
149
            else:
150
                raise NoneConditionSatisfiedException("Candidate/external tuple {} to consider against the 'ref' tuple {}, was not found to be neither small, nor equal, nor bigger than 'ref'".format(cand, self._ref))
151
            ind += 1
152
        return self._res
153
154
    def _check_end(self, cand):
155
        if cand.left == self._ref.right - 1:
156
            self._done = True
157
158
    def _cand_left_smaller_than_ref_left(self, cand):
159
        self._cond = lambda x: self._ref.left < x.right  # since cand.left < ref_left then if also ref.left < cand.right then there is overlap
160
        self._toput_left = self._ref.left
161
        return cand.left < self._ref.left
162
163
    def _cand_left_equal_to_ref_left(self, cand): # since ref.left = cand.left, there is overlap by default. no need for secondary condition
164
        self._toput_left = self._ref.left
165
        return cand.left == self._ref.left
166
167
    def _cand_left_bigger_than_ref_left(self, cand):
168
        self._cond = lambda x: x.left < self._ref.right  # since ref_left < cand.left then if also can.left < ref.right then there is overlap
169
        self._toput_left = cand.left
170
        return self._ref.left < cand.left
171
172
    def _insert_iterations(self, cand):
173
        if self._ref.right < cand.right:
174
            self._res.append([self._toput_left, self._ref.right])
175
        else:
176
            self._res.append([self._toput_left, cand.right])
177
        self._check_end(cand)
178
179
    def common_chunks(self, chunks):
180
        """
181
        :param IterationChunks chunks:
182
        :return:
183
        :rtype: IterationChunks
184
        """
185
        return IterationChunks([IterDuo(item) for sublist in map(lambda x: self._overlap(x, chunks), self.chunks) for item in sublist])
186
187
class NoneConditionSatisfiedException(Exception): pass
188
189
190
class IterChunk(object):
191
    def __init__(self, data):
192
        self._data = data
193
194
    @property
195
    def left(self):
196
        return self._data[0]
197
    @property
198
    def right(self):
199
        return self._data[1]
200
    @property
201
    def span(self):
202
        return self._data[1] - self._data[0] + 1
203
204
    def __str__(self):
205
        return str(self._data)
206
207
    def __eq__(self, other):
208
        if self._data == other:
209
            return True
210
        return False
211
212
    def __ne__(self, other):
213
        return not self.__eq__(other)
214
215
216
class IterDuo(IterChunk):
217
    def __init__(self, iters_tuple):
218
        super(IterDuo, self).__init__(iters_tuple)
219
        assert len(self._data) == 2
220
        assert self._data[0] < self._data[1]
221
        assert type(self._data[0]) == type(self._data[1]) == int
222
223
    def __str__(self):
224
        return '[{}, {}]'.format(self._data[0], self._data[1])
225
226
class IterSingle(IterChunk):
227
    def __init__(self):
228
        super(IterSingle, self).__init__(1)
229
    @property
230
    def left(self):
231
        return self._data
232
    @property
233
    def right(self):
234
        return self._data
235
    @property
236
    def span(self):
237
        return self._data
238
239
240
class TrajectoryBuilder(object):
241
    _interpolation_kind2interpolant = {'linear': {'preprocess': lambda x: (x[2], x[0], x[1]),
242
                                                 'process': lambda x: list(np.interp(*x))},
243
                                      'quadratic': {'preprocess': lambda x: (np.polyfit(x[0], x[1], 2), x[2]),
244
                                                    'process': lambda x: np.polyval(*x)},
245
                                      'cubic': {'preprocess': lambda x: (np.polyfit(x[0], x[1], 3), x[2]),
246
                                                 'process': lambda x: np.polyval(*x)}}
247
248
    def __init__(self):
249
        self._values = []
250
        self._name = ''
251
        self._interpolant = None
252
253
    @property
254
    def name(self):
255
        return self._name
256
257
    def steady_prev(self, iters):
258
        """Use regularizer using the latest tau used and keeping it constant for 'iters' train cycles through the collection"""
259
        self._values.extend([self._values[-1]]*iters)
260
        return self
261
262
    def steady_new(self, iters, value):
263
        """Use regularizer with a constant tau for 'iters' train cycles through the collection"""
264
        self._values.extend([value]*iters)
265
        return self
266
267
    def deactivate(self, iters):
268
        """Keep regularizer inactive for 'iters' train cycles through the collection"""
269
        self._values.extend([0]*iters)
270
        return self
271
272
    def interpolate_to(self, iters, value, interpolation='linear', start=None):
273
        """Use regularizer with tau gradually increasing or descreasing up to a value. Interpolates from the latest value
274
        used, (or from 'start' value if specified) to the specified 'value' using 'iters' steps. Each step is a train
275
        cycle through the collection\n
276
        Supports linear interpolation"""
277
        assert iters > 1
278
        prev_iter = len(self._values)
279
        iter_inds = range(prev_iter, prev_iter + iters)
280
        if start is None:
281
            start = self._values[-1]
282
            iter_inds = range(prev_iter + 1, prev_iter + iters + 1)
283
        self._interpolate(prev_iter, iter_inds, start, value, interpolation=interpolation)
284
        return self
285
286
    def _interpolate(self, prev_iter, iter_inds, start_y, end_y, interpolation='linear'):
287
        xs = [prev_iter, iter_inds[-1]]
288
        ys = [start_y, end_y]
289
        prods = self._interpolation_kind2interpolant[interpolation]['preprocess']((xs, ys, iter_inds))
290
        vals = self._interpolation_kind2interpolant[interpolation]['process'](prods)
291
        self._values.extend(vals)
292
293
    def begin_trajectory(self, name):
294
        self._name = name
295
        self._values = []
296
        return self
297
298
    def create(self):
299
        return ParameterTrajectory(self._name, self._values)
300
301
    def create_tau_trajectory(self, values_list):
302
        """Typical factory method"""
303
        return ParameterTrajectory('tau', values_list)
304
305
def get_fit_iteration_chunks(parameter_trajectories):
306
    """
307
    Given a list of parameter trajectories along the iteration count "dimension", returns a list of iterations tuples/steady_chunks.
308
    This steady_chunks indicate slices of the train iterations that the parameters stay constant and therefore fit can be called
309
    "consecutively" for these steady_chunks. It creates an object that encapsulates these steady_chunks\n
310
    :param list parameter_trajectories: the list of ParameterTrajectory objects
311
    :return: the newly created object
312
    :rtype: IterationChunks
313
    """
314
    return reduce(lambda x, y: x.common_chunks(y), map(lambda x: x.steady_chunks, parameter_trajectories))
315
316
317
318
if __name__ == '__main__':
319
    tr_builder = TrajectoryBuilder()
320
    _test(tr_builder)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable _test does not seem to be defined.
Loading history...
321