Passed
Pull Request — dev (#32)
by Konstantinos
05:57 queued 03:46
created

AlgorithmArguments.values()   A

Complexity

Conditions 3

Size

Total Lines 9
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 8
nop 2
dl 0
loc 9
rs 10
c 0
b 0
f 0
1
from abc import ABC, abstractmethod
2
import inspect
3
import attr
4
from so_magic.utils import SubclassRegistry
5
6
7
class DiscretizerInterface(ABC):
8
    def discretize(self, *args, **kwargs):
9
        raise NotImplementedError
10
11
12
class AbstractDiscretizer(DiscretizerInterface):
13
    def discretize(self, *args, **kwargs):
14
        raise NotImplementedError
15
16
17
@attr.s
18
class BaseDiscretizer(AbstractDiscretizer):
19
    binner = attr.ib()
20
21
    def discretize(self, *args, **kwargs):
22
        """Expects args: dataset, feature and kwargs; 'nb_bins'."""
23
        datapoints = args[0]
24
        attribute = args[1]
25
        bins = args[2]
26
        try:
27
            output = self.binner.bin(datapoints.column(attribute), bins, **kwargs)
28
        except TypeError as type_error:
29
            msg = f'Table column being processed: {attribute}. Exception text: {str(type_error)}'
30
            raise TypeError(msg) from type_error
31
32
        return output
33
34
35
@attr.s
36
class FeatureDiscretizer(BaseDiscretizer):
37
    feature = attr.ib(init=True)
38
39
    def discretize(self, *args, **kwargs):
40
        """Expects args: dataset, nb_bins."""
41
        return super().discretize(args[0], self.feature, args[1])
42
43
@attr.s
44
class FeatureDiscretizerFactory:
45
    binner_factory = attr.ib(init=True)
46
47
    def categorical(self, feature, **kwargs) -> FeatureDiscretizer:
48
        binner_type = 'same-length'
49
        if kwargs.get('quantisized', False):
50
            binner_type = 'quantisized'
51
        return FeatureDiscretizer(self.binner_factory.create_binner(binner_type), feature)
52
53
    def numerical(self, feature, **kwargs) -> FeatureDiscretizer:
54
        binner_type = 'same-length'
55
        if kwargs.get('quantisized', False):
56
            binner_type = 'quantisized'
57
        return FeatureDiscretizer(self.binner_factory.create_binner(binner_type), feature)
58
59
60
#########################################
61
62
class BinnerInterface(ABC):
63
    @abstractmethod
64
    def bin(self, values, bins):
65
        raise NotImplementedError
66
67
68
@attr.s
69
class BaseBinner(BinnerInterface):
70
    algorithm = attr.ib()
71
72
    def bin(self, values, bins):
73
        """It is assumed numerical (ratio or interval) variable or ordinal (not nominal) categorical variable."""
74
        try:
75
            return self.algorithm.run(values, bins)
76
        except TypeError as type_error:
77
            raise TypeError(f'Exception text: {str(type_error)}. Possible reasons: preprocessing is needed to make sure'
78
                            f' suitable values are places in missing entries and/or all entries are of the same type') \
79
                from type_error
80
81
82
class BinnerClass(metaclass=SubclassRegistry): pass
83
84
85
class BinnerFactory:
86
    parent_class = BinnerClass
87
88
    def equal_length_binner(self, *args, **kwargs) -> BaseBinner:
89
       """Binner that create bins of equal size (max_value - min_value)"""
90
       raise NotImplementedError
91
92
    def quantisized_binner(self, *args, **kwargs) -> BaseBinner:
93
        """Binner that will adjust the bin sizes so that the observations are evenly distributed in the bins
94
95
        Raises:
96
            NotImplementedError: [description]
97
98
        Returns:
99
            BaseBinner: [description]
100
        """
101
        raise NotImplementedError
102
103
    def create_binner(self, *args, **kwargs) -> BaseBinner:
104
        raise NotImplementedError
105
106
107
class AlgorithmInterface(ABC):
108
    @abstractmethod
109
    def run(self, *args, **kwargs):
110
        raise NotImplementedError
111
112
113
@attr.s
114
class AlgorithmArguments:
115
    """An algorithms expected positional arguments."""
116
    arg_types = attr.ib()
117
    default_values = attr.ib()
118
    _required_args = attr.ib(init=False, default=attr.Factory(lambda self: len(self.arg_types), takes_self=True))
119
120
    def values(self, *args):
121
        if len(args) > len(self._required_args):
122
            raise AlgorithmArgumentsError(f'Given more than the supported naumber of arguments. '
123
                                          f'{len(args)} > {len(self._required_args)}')
124
        missing = len(self._required_args) - len(args)
125
        computed_args_list = list(args) + self.default_values[-missing:]
126
        if not all(isinstance(arg_value, self.arg_types[i]) for i, arg_value in computed_args_list):
127
            raise AlgorithmArgumentsError('Type missmatch')
128
        return computed_args_list
129
130
131
@attr.s
132
class AbstractAlgorithm(AlgorithmInterface, ABC):
133
    callback: callable = attr.ib()
134
    arguments: list = attr.ib(default=attr.Factory(list))
135
    parameters: dict = attr.ib(default=attr.Factory(dict))
136
    default_parameter_values = attr.ib(init=False, default=attr.Factory(lambda self: {k: v['value'] for k, v in self.parameters.items()}, takes_self=True))
137
    _args = attr.ib(init=False, default=attr.Factory(list))
138
139
140
@attr.s
141
class MagicAlgorithm(AbstractAlgorithm):
142
    _signature = attr.ib(init=False,
143
                         default=attr.Factory(lambda self: inspect.signature(self.callback), takes_self=True))
144
    _output = attr.ib(init=False, default=attr.Factory(dict))
145
146
    def run(self, *args, **kwargs):
147
        if not len(args) == len(self.arguments):
148
            raise MagicAlgorithmError(
149
                f'Number of runtime positional arguments do not match the expected number of positional argumnets. '
150
                f'Given {len(args)} arguments: [{", ".join(str(_) for _ in args)}]. Expected {len(self.arguments)} arguments: [{", ".join(str(_) for _ in self.arguments)}].')
151
        if not all(isinstance(argument, self.arguments[i]) for i, argument in enumerate(args)):
152
            raise MagicAlgorithmError(f'Bad positional argument for algorithm. Expected arguments with types '
153
                                      f'[{", ".join(self.arguments)}]. Instead got [{", ".join(self.arguments)}].')
154
        self._args = list(args)
155
        self.update_parameters(**kwargs)
156
        result = self._run_callback()
157
        self._output['settings'] = self._get_settings(result)
158
        self._output['result'] = self._get_result(result)
159
        return self._output
160
161
    def _run_callback(self):
162
        return self.callback(*self._args, **{k: v['value'] for k, v in self.parameters.items()})
163
164
    @property
165
    def output(self):
166
        return self._output
167
168
    def _get_result(self, result):
169
        return result
170
171
    def _get_settings(self, result):
172
        return {
173
            'arguments': self._args,
174
            'parameters': {
175
                param_name: v['value'] for param_name, v in self.parameters.items()
176
            },
177
        }
178
179
    def update_parameters(self, **kwargs):
180
        if not all(isinstance(parameter_value, self.parameters['type']) for parameter_name, parameter_value in kwargs if parameter_name in self.parameters):
181
            raise MagicAlgorithmParametersError(
182
                f'Bad algorithm parameters. Allowed parameters with types '
183
                f'[{", ".join(f"{k}: {v}" for k, v in self.parameters.items())}]. '
184
                f'Instead got [{", ".join(f"{k}: {v}" for k, v in kwargs.items())}].')
185
        self._update_params(**kwargs)
186
187
    def set_default_parameters(self):
188
        self._update_params(**self.default_parameter_values)
189
190
    def _update_params(self, **kwargs):
191
        for k, v in kwargs.items():
192
            self.parameters[k]['value'] = v
193
194
195
class MagicAlgorithmError(Exception): pass
196
class MagicAlgorithmParametersError(Exception): pass
197
class AlgorithmArgumentsError(Exception): pass
198
199
200
201
def call_method(a_callable):
202
    def _call(self, *args, **kwargs):
203
        return a_callable(*args, **kwargs)
204
    return _call
205
206
207
@attr.s
208
class Discretizer(BaseDiscretizer):
209
210
    @property
211
    def algorithm(self):
212
        return self.binner.algorithm
213
214
    @classmethod
215
    def from_algorithm(cls, alg):
216
        binner = BaseBinner(alg)
217
        return Discretizer(binner)
218
219
220
from so_magic.utils import SubclassRegistry
221
222
class BinningAlgorithm(metaclass=SubclassRegistry):
223
224
    @classmethod
225
    def from_built_in(cls, algorithm_id):
226
        return cls.create(algorithm_id,
227
                          cls.subclasses[algorithm_id]._callback,
228
                          # TODO replace with call to dataclass
229
                          [object, object],
230
                          {
231
                              'right': {
232
                                'type': bool,
233
                                'value': True
234
                          },
235
                              'labels': {
236
                                  'type': object,
237
                                  'value': None
238
                              },
239
                              'retbins': {
240
                                  'type': bool,
241
                                  'value': True
242
                              },
243
                              'precision': {
244
                                  'type': int,
245
                                  'value': 3
246
                              },
247
                              'include_lowest': {
248
                                  'type': bool,
249
                                  'value': False
250
                              },
251
                              'duplicates': {
252
                                  'type': str,
253
                                  'value': 'raise'
254
                              },
255
                          })
256
257
258
import pandas as pd
259
260
@BinningAlgorithm.register_as_subclass('pd.cut')
261
class PDCutBinningAlgorithm(MagicAlgorithm):
262
    _callback = pd.cut
263
264
    def _get_settings(self, result):
265
        # if result:
266
        #     return dict(super()._get_settings(result), **{'used_bins': [str(_) for _ in result.categories]})
267
        # return super()._get_settings(result)
268
        return dict(super()._get_settings(result), **{'used_bins': result[1]})
269
270
    def _get_result(self, result):
271
        if bool(self.parameters['retbins']):
272
            return super()._get_result(result)[0]
273
        return super()._get_result(result)
274