Passed
Push — dev ( 5b3715...a42717 )
by Konstantinos
01:24
created

green_magic.som.som_proxy   A

Complexity

Total Complexity 29

Size/Duplication

Total Lines 130
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 29
eloc 90
dl 0
loc 130
rs 10
c 0
b 0
f 0

19 Methods

Rating   Name   Duplication   Size   Complexity  
A SomTrainer.infer_map() 0 10 2
A SomFactory.create_som() 0 8 2
A SelfOrganizingMapFactory.create() 0 2 1
A SomFactory.unregister_all() 0 3 2
A SomFactory.register() 0 3 2
A SomFactory.update_observers() 0 3 2
A SomFactory.unregister() 0 3 2
A SelfOrganizingMap.grid_type() 0 3 1
A SelfOrganizingMap.neurons_coordinates() 0 6 2
A SelfOrganizingMap.height() 0 3 1
A SelfOrganizingMap.get_map_id() 0 5 2
A SelfOrganizingMap.datapoint_coordinates() 0 4 1
A SelfOrganizingMap.__getattr__() 0 4 2
A SelfOrganizingMap.project() 0 3 1
A SelfOrganizingMap.type() 0 3 1
A SelfOrganizingMap.cluster() 0 2 1
A SelfOrganizingMap.visual_umatrix() 0 7 2
A SelfOrganizingMap.width() 0 3 1
A SelfOrganizingMap.nb_clusters() 0 3 1
1
import attr
2
import numpy as np
3
import somoclu
4
from sklearn.cluster import KMeans
5
6
import logging
7
logger = logging.getLogger(__name__)
8
9
10
class SomTrainer:
11
12
    def infer_map(self, nb_cols, nb_rows, dataset, **kwargs):
13
        """Infer a self-organizing map from dataset.\n
14
        initialcodebook = None, kerneltype = 0, maptype = 'planar', gridtype = 'rectangular',
15
        compactsupport = False, neighborhood = 'gaussian', std_coeff = 0.5, initialization = None
16
        """
17
        if not hasattr(dataset, 'feature_vectors'):
18
            raise NoFeatureVectorsError("Attempted to train a Som model, but did not find feature vectors in the dataset.")
19
        som = somoclu.Somoclu(nb_cols, nb_rows, **kwargs)
20
        som.train(data=np.array(dataset.feature_vectors, dtype=np.float32))
21
        return som
22
23
24
@attr.s
25
class SomFactory:
26
    """Implementing from the BaseSomFactory allows other class to register/subscribe on (emulated) 'events'.
27
       So, when the factory creates a new Som object, other entities can be notified."""
28
    trainer = attr.ib(init=True, default=SomTrainer())
29
    observers = attr.ib(init=False, default=[])
30
31
    def register(self, observer):
32
        if observer not in self.observers:
33
            self.observers.append(observer)
34
35
    def unregister(self, observer):
36
        if observer in self.observers:
37
            self.observers.remove(observer)
38
39
    def unregister_all(self):
40
        if self.observers:
41
            del self.observers[:]
42
43
    def update_observers(self, *args, **kwargs):
44
        for observer in self.observers:
45
            observer.update(*args, **kwargs)
46
47
    def create_som(self, nb_cols, nb_rows, dataset, **kwargs):
48
        try:
49
            map_obj = self.trainer.infer_map(nb_cols, nb_rows, dataset, **kwargs)
50
            self.update_observers(nb_rows, nb_cols, map_object=map_obj)
51
            return map_obj
52
        except NoFeatureVectorsError as e:
53
            logger.info(f"{e}. Fire up an 'encode' command.")
54
            raise e
55
56
57
class NoFeatureVectorsError(Exception): pass
58
59
60
@attr.s
61
class SelfOrganizingMapFactory:
62
    som_factory = attr.ib(init=True, default=SomFactory())
63
64
    def create(self, dataset, nb_cols, nb_rows, **kwargs):
65
        return SelfOrganizingMap(self.som_factory.create_som(nb_cols, nb_rows, dataset, **kwargs), dataset.name)
66
67
68
@attr.s
69
class SelfOrganizingMap:
70
    som = attr.ib(init=True)
71
    dataset_name = attr.ib(init=True)
72
73
    @property
74
    def height(self):
75
        return self.som._n_rows
76
77
    @property
78
    def width(self):
79
        return self.som._n_columns
80
81
    @property
82
    def type(self):
83
        return self.som._map_type
84
85
    @property
86
    def grid_type(self):
87
        return self.som._grid_type
88
89
    def __getattr__(self, item):
90
        if item in ('n_rows', 'n_columns', 'initialization', 'map_type', 'grid_type'):
91
            item = f'_{item}'
92
        return getattr(self.som, item)
93
94
    def get_map_id(self):
95
        _ = '_'.join(getattr(self, attribute) for attribute in ['dataset_name', 'n_rows', 'n_columns', 'initialization', 'map_type', 'grid_type'])
96
        if self.som.clusters:
97
            return f'{_}_cl{self.nb_clusters}'
98
        return _
99
100
    @property
101
    def nb_clusters(self):
102
        return np.max(self.som.clusters)
103
104
    def neurons_coordinates(self):
105
        """"""
106
        for i, arr in enumerate(self.som.bmus):  # iterate through the array of shape [nb_datapoints, 2]. Each row is the coordinates
107
            # of the neuron the datapoint gets attributed to (closest distance)
108
            attributed_cluster = self.som.clusters[arr[0], arr[1]]  # >= 0
109
            id2members[attributed_cluster].add(dataset[i].id)
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable id2members does not seem to be defined.
Loading history...
Comprehensibility Best Practice introduced by
The variable dataset does not seem to be defined.
Loading history...
110
111
    def datapoint_coordinates(self, index):
112
        """Call this method to get the best-matching unit (bmu) coordinates of the datapoint indexed byt the input pointer.\n
113
        Bmu is simply the neuron on the som grid that is closest to the datapoint after being projected to the 2D space."""
114
        return self.som.bmus[index][0], self.som.bmus[index][1]
115
116
    def project(self, datapoint):
117
        """Compute the coordinates of a (potentially unseen) datapoint. It is assumed that the codebook has been computed already."""
118
        pass
119
120
    def cluster(self, nb_clusters, random_state=None):
121
        som.cluster(algorithm=KMeans(n_clusters=nb_clusters, random_state=random_state))
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable som does not seem to be defined.
Loading history...
122
123
    @property
124
    def visual_umatrix(self):
125
        b = ''
126
        max_len = len(str(np.max(self.som.clusters)))  # i.e. a clustering of 11 clusters with ids 0, 1, .., 10 has a max_len = 2
127
        for j in range(self.som.umatrix.shape[0]):
128
            b += ' '.join(' ' * (max_len - len(str(i))) + str(i) for i in self.som.clusters[j, :]) + '\n'
129
        return b
130
131
132