Passed
Push — master ( 96da92...a1b572 )
by Konstantinos
37s queued 14s
created

so_magic.data.discretization.call_method()   A

Complexity

Conditions 1

Size

Total Lines 4
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 4
nop 1
dl 0
loc 4
rs 10
c 0
b 0
f 0
1
from abc import ABC, abstractmethod
2
import inspect
3
import attr
4
import pandas as pd
5
from so_magic.utils import SubclassRegistry
6
7
8
class DiscretizerInterface(ABC):
9
    def discretize(self, *args, **kwargs):
10
        raise NotImplementedError
11
12
13
class AbstractDiscretizer(DiscretizerInterface):
14
    def discretize(self, *args, **kwargs):
15
        raise NotImplementedError
16
17
18
@attr.s
19
class BaseDiscretizer(AbstractDiscretizer):
20
    binner = attr.ib()
21
22
    def discretize(self, *args, **kwargs):
23
        """Expects args: dataset, feature and kwargs; 'nb_bins'."""
24
        datapoints = args[0]
25
        attribute = args[1]
26
        bins = args[2]
27
        try:
28
            output = self.binner.bin(datapoints.column(attribute), bins, **kwargs)
29
        except TypeError as type_error:
30
            msg = f'Table column being processed: {attribute}. Exception text: {str(type_error)}'
31
            raise TypeError(msg) from type_error
32
33
        return output
34
35
36
@attr.s
37
class FeatureDiscretizer(BaseDiscretizer):
38
    feature = attr.ib(init=True)
39
40
    def discretize(self, *args, **kwargs):
41
        """Expects args: dataset, nb_bins."""
42
        return super().discretize(args[0], self.feature, args[1])
43
44
@attr.s
45
class FeatureDiscretizerFactory:
46
    binner_factory = attr.ib(init=True)
47
48
    def categorical(self, feature, **kwargs) -> FeatureDiscretizer:
49
        binner_type = 'same-length'
50
        if kwargs.get('quantisized', False):
51
            binner_type = 'quantisized'
52
        return FeatureDiscretizer(self.binner_factory.create_binner(binner_type), feature)
53
54
    def numerical(self, feature, **kwargs) -> FeatureDiscretizer:
55
        binner_type = 'same-length'
56
        if kwargs.get('quantisized', False):
57
            binner_type = 'quantisized'
58
        return FeatureDiscretizer(self.binner_factory.create_binner(binner_type), feature)
59
60
61
#########################################
62
63
class BinnerInterface(ABC):
64
    @abstractmethod
65
    def bin(self, values, bins):
66
        raise NotImplementedError
67
68
69
@attr.s
70
class BaseBinner(BinnerInterface):
71
    algorithm = attr.ib()
72
73
    def bin(self, values, bins):
74
        """It is assumed numerical (ratio or interval) variable or ordinal (not nominal) categorical variable."""
75
        try:
76
            return self.algorithm.run(values, bins)
77
        except TypeError as type_error:
78
            raise TypeError(f'Exception text: {str(type_error)}. Possible reasons: preprocessing is needed to make sure'
79
                            f' suitable values are places in missing entries and/or all entries are of the same type') \
80
                from type_error
81
82
83
class BinnerClass(metaclass=SubclassRegistry): pass
84
85
86
class BinnerFactory:
87
    parent_class = BinnerClass
88
89
    def equal_length_binner(self, *args, **kwargs) -> BaseBinner:
90
        """Binner that create bins of equal size (max_value - min_value)"""
91
        raise NotImplementedError
92
93
    def quantisized_binner(self, *args, **kwargs) -> BaseBinner:
94
        """Binner that will adjust the bin sizes so that the observations are evenly distributed in the bins
95
96
        Raises:
97
            NotImplementedError: [description]
98
99
        Returns:
100
            BaseBinner: [description]
101
        """
102
        raise NotImplementedError
103
104
    def create_binner(self, *args, **kwargs) -> BaseBinner:
105
        raise NotImplementedError
106
107
108
class AlgorithmInterface(ABC):
109
    @abstractmethod
110
    def run(self, *args, **kwargs):
111
        raise NotImplementedError
112
113
114
@attr.s
115
class AlgorithmArguments:
116
    """An algorithms expected positional arguments."""
117
    arg_types = attr.ib()
118
    default_values = attr.ib()
119
    _required_args = attr.ib(init=False, default=attr.Factory(lambda self: len(self.arg_types), takes_self=True))
120
121
    def values(self, *args):
122
        if len(args) > len(self._required_args):
123
            raise AlgorithmArgumentsError(f'Given more than the supported naumber of arguments. '
124
                                          f'{len(args)} > {len(self._required_args)}')
125
        missing = len(self._required_args) - len(args)
126
        computed_args_list = list(args) + self.default_values[-missing:]
127
        if not all(isinstance(arg_value, self.arg_types[i]) for i, arg_value in computed_args_list):
128
            raise AlgorithmArgumentsError('Type missmatch')
129
        return computed_args_list
130
131
132
@attr.s
133
class AbstractAlgorithm(AlgorithmInterface, ABC):
134
    callback: callable = attr.ib()
135
    arguments: list = attr.ib(default=attr.Factory(list))
136
    parameters: dict = attr.ib(default=attr.Factory(dict))
137
    default_parameter_values = attr.ib(init=False, default=attr.Factory(
138
        lambda self: {k: v['value'] for k, v in self.parameters.items()}, takes_self=True))
139
    _args = attr.ib(init=False, default=attr.Factory(list))
140
141
142
@attr.s
143
class MagicAlgorithm(AbstractAlgorithm):
144
    _signature = attr.ib(init=False,
145
                         default=attr.Factory(lambda self: inspect.signature(self.callback), takes_self=True))
146
    _output = attr.ib(init=False, default=attr.Factory(dict))
147
148
    def run(self, *args, **kwargs):
149
        if not len(args) == len(self.arguments):
150
            raise MagicAlgorithmError(
151
                f'Number of runtime positional arguments do not match the expected number of positional argumnets. '
152
                f'Given {len(args)} arguments: [{", ".join(str(_) for _ in args)}]. Expected {len(self.arguments)} '
153
                f'arguments: [{", ".join(str(_) for _ in self.arguments)}].')
154
        if not all(isinstance(argument, self.arguments[i]) for i, argument in enumerate(args)):
155
            raise MagicAlgorithmError(f'Bad positional argument for algorithm. Expected arguments with types '
156
                                      f'[{", ".join(self.arguments)}]. Instead got [{", ".join(self.arguments)}].')
157
        self._args = list(args)
158
        self.update_parameters(**kwargs)
159
        result = self._run_callback()
160
        self._output['settings'] = self._get_settings(result)
161
        self._output['result'] = self._get_result(result)
162
        return self._output
163
164
    def _run_callback(self):
165
        return self.callback(*self._args, **{k: v['value'] for k, v in self.parameters.items()})
166
167
    @property
168
    def output(self):
169
        return self._output
170
171
    def _get_result(self, result):
172
        return result
173
174
    def _get_settings(self, _result):
175
        return {
176
            'arguments': self._args,
177
            'parameters': {
178
                param_name: param_data['value'] for param_name, param_data in self.parameters.items()
179
            },
180
        }
181
182
    def update_parameters(self, **kwargs):
183
        if not all(isinstance(parameter_value, self.parameters['type']) for parameter_name, parameter_value in kwargs
184
                   if parameter_name in self.parameters):
185
            raise MagicAlgorithmParametersError(
186
                f'Bad algorithm parameters. Allowed parameters with types '
187
                f'[{", ".join(f"{k}: {v}" for k, v in self.parameters.items())}]. '
188
                f'Instead got [{", ".join(f"{k}: {v}" for k, v in kwargs.items())}].')
189
        self._update_params(**kwargs)
190
191
    def set_default_parameters(self):
192
        self._update_params(**self.default_parameter_values)
193
194
    def _update_params(self, **kwargs):
195
        for key, value in kwargs.items():
196
            self.parameters[key]['value'] = value
197
198
199
class MagicAlgorithmError(Exception): pass
200
class MagicAlgorithmParametersError(Exception): pass
201
class AlgorithmArgumentsError(Exception): pass
202
203
204
def call_method(a_callable):
205
    def _call(_self, *args, **kwargs):
206
        return a_callable(*args, **kwargs)
207
    return _call
208
209
210
@attr.s
211
class Discretizer(BaseDiscretizer):
212
213
    @property
214
    def algorithm(self):
215
        return self.binner.algorithm
216
217
    @classmethod
218
    def from_algorithm(cls, alg):
219
        binner = BaseBinner(alg)
220
        return Discretizer(binner)
221
222
223
class BinningAlgorithm(metaclass=SubclassRegistry):
224
225
    @classmethod
226
    def from_built_in(cls, algorithm_id):
227
        return cls.create(algorithm_id,
228
                          pd.cut,
229
                          # TODO replace with call to dataclass
230
                          [object, object],
231
                          {
232
                              'right': {
233
                                  'type': bool,
234
                                  'value': True,
235
                              },
236
                              'labels': {
237
                                  'type': object,
238
                                  'value': None
239
                              },
240
                              'retbins': {
241
                                  'type': bool,
242
                                  'value': True
243
                              },
244
                              'precision': {
245
                                  'type': int,
246
                                  'value': 3
247
                              },
248
                              'include_lowest': {
249
                                  'type': bool,
250
                                  'value': False
251
                              },
252
                              'duplicates': {
253
                                  'type': str,
254
                                  'value': 'raise'
255
                              },
256
                          }
257
                          )
258
259
260
@BinningAlgorithm.register_as_subclass('pd.cut')
261
class PDCutBinningAlgorithm(MagicAlgorithm):
262
263
    def _get_settings(self, result):
264
        return dict(super()._get_settings(result), **{'used_bins': result[1]})
265
266
    def _get_result(self, result):
267
        if bool(self.parameters['retbins']):
268
            return super()._get_result(result)[0]
269
        return super()._get_result(result)
270