Passed
Pull Request — dev (#32)
by Konstantinos
03:59 queued 02:15
created

so_magic.data.discretization.BaseBinner.bin()   A

Complexity

Conditions 2

Size

Total Lines 8
Code Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
eloc 6
nop 3
dl 0
loc 8
rs 10
c 0
b 0
f 0
1
from abc import ABC, abstractmethod
2
import inspect
3
import attr
4
from so_magic.utils import SubclassRegistry
5
6
7
class DiscretizerInterface(ABC):
8
    def discretize(self, *args, **kwargs):
9
        raise NotImplementedError
10
11
12
class AbstractDiscretizer(DiscretizerInterface):
13
    def discretize(self, *args, **kwargs):
14
        raise NotImplementedError
15
16
17
@attr.s
18
class BaseDiscretizer(AbstractDiscretizer):
19
    binner = attr.ib()
20
21
    def discretize(self, *args, **kwargs):
22
        """Expects args: dataset, feature and kwargs; 'nb_bins'."""
23
        datapoints = args[0]
24
        attribute = args[1]
25
        bins = args[2]
26
        try:
27
            output = self.binner.bin(datapoints.column(attribute), bins, **kwargs)
28
        except TypeError as type_error:
29
            msg = f'Table column being processed: {attribute}. Exception text: {str(type_error)}'
30
            raise TypeError(msg) from type_error
31
32
        return output
33
34
35
@attr.s
36
class FeatureDiscretizer(BaseDiscretizer):
37
    feature = attr.ib(init=True)
38
39
    def discretize(self, *args, **kwargs):
40
        """Expects args: dataset, nb_bins."""
41
        return super().discretize(args[0], self.feature, args[1])
42
43
@attr.s
44
class FeatureDiscretizerFactory:
45
    binner_factory = attr.ib(init=True)
46
47
    def categorical(self, feature, **kwargs) -> FeatureDiscretizer:
48
        binner_type = 'same-length'
49
        if kwargs.get('quantisized', False):
50
            binner_type = 'quantisized'
51
        return FeatureDiscretizer(self.binner_factory.create_binner(binner_type), feature)
52
53
    def numerical(self, feature, **kwargs) -> FeatureDiscretizer:
54
        binner_type = 'same-length'
55
        if kwargs.get('quantisized', False):
56
            binner_type = 'quantisized'
57
        return FeatureDiscretizer(self.binner_factory.create_binner(binner_type), feature)
58
59
60
#########################################
61
62
class BinnerInterface(ABC):
63
    @abstractmethod
64
    def bin(self, values, bins):
65
        raise NotImplementedError
66
67
68
@attr.s
69
class BaseBinner(BinnerInterface):
70
    algorithm = attr.ib()
71
72
    def bin(self, values, bins):
73
        """It is assumed numerical (ratio or interval) variable or ordinal (not nominal) categorical variable."""
74
        try:
75
            return self.algorithm.run(values, bins)
76
        except TypeError as type_error:
77
            raise TypeError(f'Exception text: {str(type_error)}. Possible reasons: preprocessing is needed to make sure'
78
                            f' suitable values are places in missing entries and/or all entries are of the same type') \
79
                from type_error
80
81
82
class BinnerClass(metaclass=SubclassRegistry): pass
83
84
85
class BinnerFactory:
86
    parent_class = BinnerClass
87
88
    def equal_length_binner(self, *args, **kwargs) -> BaseBinner:
89
        """Binner that create bins of equal size (max_value - min_value)"""
90
        raise NotImplementedError
91
92
    def quantisized_binner(self, *args, **kwargs) -> BaseBinner:
93
        """Binner that will adjust the bin sizes so that the observations are evenly distributed in the bins
94
95
        Raises:
96
            NotImplementedError: [description]
97
98
        Returns:
99
            BaseBinner: [description]
100
        """
101
        raise NotImplementedError
102
103
    def create_binner(self, *args, **kwargs) -> BaseBinner:
104
        raise NotImplementedError
105
106
107
class AlgorithmInterface(ABC):
108
    @abstractmethod
109
    def run(self, *args, **kwargs):
110
        raise NotImplementedError
111
112
113
@attr.s
114
class AlgorithmArguments:
115
    """An algorithms expected positional arguments."""
116
    arg_types = attr.ib()
117
    default_values = attr.ib()
118
    _required_args = attr.ib(init=False, default=attr.Factory(lambda self: len(self.arg_types), takes_self=True))
119
120
    def values(self, *args):
121
        if len(args) > len(self._required_args):
122
            raise AlgorithmArgumentsError(f'Given more than the supported naumber of arguments. '
123
                                          f'{len(args)} > {len(self._required_args)}')
124
        missing = len(self._required_args) - len(args)
125
        computed_args_list = list(args) + self.default_values[-missing:]
126
        if not all(isinstance(arg_value, self.arg_types[i]) for i, arg_value in computed_args_list):
127
            raise AlgorithmArgumentsError('Type missmatch')
128
        return computed_args_list
129
130
131
@attr.s
132
class AbstractAlgorithm(AlgorithmInterface, ABC):
133
    callback: callable = attr.ib()
134
    arguments: list = attr.ib(default=attr.Factory(list))
135
    parameters: dict = attr.ib(default=attr.Factory(dict))
136
    default_parameter_values = attr.ib(init=False, default=attr.Factory(
137
        lambda self: {k: v['value'] for k, v in self.parameters.items()}, takes_self=True))
138
    _args = attr.ib(init=False, default=attr.Factory(list))
139
140
141
@attr.s
142
class MagicAlgorithm(AbstractAlgorithm):
143
    _signature = attr.ib(init=False,
144
                         default=attr.Factory(lambda self: inspect.signature(self.callback), takes_self=True))
145
    _output = attr.ib(init=False, default=attr.Factory(dict))
146
147
    def run(self, *args, **kwargs):
148
        if not len(args) == len(self.arguments):
149
            raise MagicAlgorithmError(
150
                f'Number of runtime positional arguments do not match the expected number of positional argumnets. '
151
                f'Given {len(args)} arguments: [{", ".join(str(_) for _ in args)}]. Expected {len(self.arguments)} '
152
                f'arguments: [{", ".join(str(_) for _ in self.arguments)}].')
153
        if not all(isinstance(argument, self.arguments[i]) for i, argument in enumerate(args)):
154
            raise MagicAlgorithmError(f'Bad positional argument for algorithm. Expected arguments with types '
155
                                      f'[{", ".join(self.arguments)}]. Instead got [{", ".join(self.arguments)}].')
156
        self._args = list(args)
157
        self.update_parameters(**kwargs)
158
        result = self._run_callback()
159
        self._output['settings'] = self._get_settings(result)
160
        self._output['result'] = self._get_result(result)
161
        return self._output
162
163
    def _run_callback(self):
164
        return self.callback(*self._args, **{k: v['value'] for k, v in self.parameters.items()})
165
166
    @property
167
    def output(self):
168
        return self._output
169
170
    def _get_result(self, result):
171
        return result
172
173
    def _get_settings(self, result):
174
        return {
175
            'arguments': self._args,
176
            'parameters': {
177
                param_name: param_data['value'] for param_name, param_data in self.parameters.items()
178
            },
179
        }
180
181
    def update_parameters(self, **kwargs):
182
        if not all(isinstance(parameter_value, self.parameters['type']) for parameter_name, parameter_value in kwargs
183
                   if parameter_name in self.parameters):
184
            raise MagicAlgorithmParametersError(
185
                f'Bad algorithm parameters. Allowed parameters with types '
186
                f'[{", ".join(f"{k}: {v}" for k, v in self.parameters.items())}]. '
187
                f'Instead got [{", ".join(f"{k}: {v}" for k, v in kwargs.items())}].')
188
        self._update_params(**kwargs)
189
190
    def set_default_parameters(self):
191
        self._update_params(**self.default_parameter_values)
192
193
    def _update_params(self, **kwargs):
194
        for k, v in kwargs.items():
195
            self.parameters[k]['value'] = v
196
197
198
class MagicAlgorithmError(Exception): pass
199
class MagicAlgorithmParametersError(Exception): pass
200
class AlgorithmArgumentsError(Exception): pass
201
202
203
204
def call_method(a_callable):
205
    def _call(self, *args, **kwargs):
206
        return a_callable(*args, **kwargs)
207
    return _call
208
209
210
@attr.s
211
class Discretizer(BaseDiscretizer):
212
213
    @property
214
    def algorithm(self):
215
        return self.binner.algorithm
216
217
    @classmethod
218
    def from_algorithm(cls, alg):
219
        binner = BaseBinner(alg)
220
        return Discretizer(binner)
221
222
223
class BinningAlgorithm(metaclass=SubclassRegistry):
224
225
    @classmethod
226
    def from_built_in(cls, algorithm_id):
227
        return cls.create(algorithm_id,
228
                          cls.subclasses[algorithm_id]._callback,
229
                          # TODO replace with call to dataclass
230
                          [object, object],
231
                          {
232
                              'right': {
233
                                'type': bool,
234
                                'value': True
235
                          },
236
                              'labels': {
237
                                  'type': object,
238
                                  'value': None
239
                              },
240
                              'retbins': {
241
                                  'type': bool,
242
                                  'value': True
243
                              },
244
                              'precision': {
245
                                  'type': int,
246
                                  'value': 3
247
                              },
248
                              'include_lowest': {
249
                                  'type': bool,
250
                                  'value': False
251
                              },
252
                              'duplicates': {
253
                                  'type': str,
254
                                  'value': 'raise'
255
                              },
256
                          })
257
258
259
import pandas as pd
260
261
@BinningAlgorithm.register_as_subclass('pd.cut')
262
class PDCutBinningAlgorithm(MagicAlgorithm):
263
    _callback = pd.cut
264
265
    def _get_settings(self, result):
266
        # if result:
267
        #     return dict(super()._get_settings(result), **{'used_bins': [str(_) for _ in result.categories]})
268
        # return super()._get_settings(result)
269
        return dict(super()._get_settings(result), **{'used_bins': result[1]})
270
271
    def _get_result(self, result):
272
        if bool(self.parameters['retbins']):
273
            return super()._get_result(result)[0]
274
        return super()._get_result(result)
275