Passed
Pull Request — master (#500)
by Osma
02:13
created

annif.backend.nn_ensemble   A

Complexity

Total Complexity 29

Size/Duplication

Total Lines 217
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 162
dl 0
loc 217
rs 10
c 0
b 0
f 0
wmc 29

14 Methods

Rating   Name   Duplication   Size   Complexity  
A LMDBSequence.__len__() 0 3 1
A LMDBSequence.__getitem__() 0 14 3
A LMDBSequence.add_sample() 0 10 1
A LMDBSequence.__init__() 0 8 2
A NNEnsembleBackend._create_model() 0 31 2
A MeanLayer.call() 0 2 1
A NNEnsembleBackend.default_params() 0 4 1
A NNEnsembleBackend._corpus_to_vectors() 0 17 3
A NNEnsembleBackend.initialize() 0 12 3
A NNEnsembleBackend._open_lmdb() 0 5 3
A NNEnsembleBackend._merge_hits_from_sources() 0 9 1
A NNEnsembleBackend._learn() 0 3 1
A NNEnsembleBackend._train() 0 4 1
A NNEnsembleBackend._fit_model() 0 18 4

2 Functions

Rating   Name   Duplication   Size   Complexity  
A key_to_idx() 0 3 1
A idx_to_key() 0 3 1
1
"""Neural network based ensemble backend that combines results from multiple
2
projects."""
3
4
5
from io import BytesIO
6
import shutil
7
import os.path
8
import numpy as np
9
from scipy.sparse import csr_matrix, csc_matrix
10
import joblib
11
import lmdb
12
from tensorflow.keras.layers import Input, Dense, Add, Flatten, Dropout, Layer
13
from tensorflow.keras.models import Model, load_model
14
from tensorflow.keras.utils import Sequence
15
import tensorflow.keras.backend as K
16
import annif.corpus
17
import annif.util
18
from annif.exception import NotInitializedException
19
from annif.suggestion import VectorSuggestionResult
20
from . import backend
21
from . import ensemble
22
23
24
def idx_to_key(idx):
25
    """convert an integer index to a binary key for use in LMDB"""
26
    return b'%08d' % idx
27
28
29
def key_to_idx(key):
30
    """convert a binary LMDB key to an integer index"""
31
    return int(key)
32
33
34
class LMDBSequence(Sequence):
35
    """A sequence of samples stored in a LMDB database."""
36
37
    def __init__(self, txn, batch_size):
38
        self._txn = txn
39
        cursor = txn.cursor()
40
        if cursor.last():
41
            self._counter = key_to_idx(cursor.key())
42
        else:  # empty database
43
            self._counter = 0
44
        self._batch_size = batch_size
45
46
    def add_sample(self, inputs, targets):
47
        # use zero-padded 8-digit key
48
        key = idx_to_key(self._counter)
49
        self._counter += 1
50
        # convert the sample into a sparse matrix and serialize it as bytes
51
        sample = (csc_matrix(inputs), csr_matrix(targets))
52
        buf = BytesIO()
53
        joblib.dump(sample, buf)
54
        buf.seek(0)
55
        self._txn.put(key, buf.read())
56
57
    def __getitem__(self, idx):
58
        """get a particular batch of samples"""
59
        cursor = self._txn.cursor()
60
        first_key = idx * self._batch_size
61
        cursor.set_key(idx_to_key(first_key))
62
        input_arrays = []
63
        target_arrays = []
64
        for key, value in cursor.iternext():
65
            if key_to_idx(key) >= (first_key + self._batch_size):
66
                break
67
            input_csr, target_csr = joblib.load(BytesIO(value))
68
            input_arrays.append(input_csr.toarray())
69
            target_arrays.append(target_csr.toarray().flatten())
70
        return np.array(input_arrays), np.array(target_arrays)
71
72
    def __len__(self):
73
        """return the number of available batches"""
74
        return int(np.ceil(self._counter / self._batch_size))
75
76
77
class MeanLayer(Layer):
78
    """Custom Keras layer that calculates mean values along the 2nd axis."""
79
    def call(self, inputs):
80
        return K.mean(inputs, axis=2)
81
82
83
class NNEnsembleBackend(
84
        backend.AnnifLearningBackend,
85
        ensemble.BaseEnsembleBackend):
86
    """Neural network ensemble backend that combines results from multiple
87
    projects"""
88
89
    name = "nn_ensemble"
90
91
    MODEL_FILE = "nn-model.h5"
92
    LMDB_FILE = 'nn-train.mdb'
93
    LMDB_MAP_SIZE = 1024 * 1024 * 1024
94
95
    DEFAULT_PARAMETERS = {
96
        'nodes': 100,
97
        'dropout_rate': 0.2,
98
        'optimizer': 'adam',
99
        'epochs': 10,
100
        'learn-epochs': 1,
101
    }
102
103
    # defaults for uninitialized instances
104
    _model = None
105
106
    def default_params(self):
107
        params = backend.AnnifBackend.DEFAULT_PARAMETERS.copy()
108
        params.update(self.DEFAULT_PARAMETERS)
109
        return params
110
111
    def initialize(self):
112
        super().initialize()
113
        if self._model is not None:
114
            return  # already initialized
115
        model_filename = os.path.join(self.datadir, self.MODEL_FILE)
116
        if not os.path.exists(model_filename):
117
            raise NotInitializedException(
118
                'model file {} not found'.format(model_filename),
119
                backend_id=self.backend_id)
120
        self.debug('loading Keras model from {}'.format(model_filename))
121
        self._model = load_model(model_filename,
122
                                 custom_objects={'MeanLayer': MeanLayer})
123
124
    def _merge_hits_from_sources(self, hits_from_sources, params):
125
        score_vector = np.array([np.sqrt(hits.as_vector(subjects))
126
                                 * weight * len(hits_from_sources)
127
                                 for hits, weight, subjects
128
                                 in hits_from_sources],
129
                                dtype=np.float32)
130
        results = self._model.predict(
131
            np.expand_dims(score_vector.transpose(), 0))
132
        return VectorSuggestionResult(results[0])
133
134
    def _create_model(self, sources):
135
        self.info("creating NN ensemble model")
136
137
        inputs = Input(shape=(len(self.project.subjects), len(sources)))
138
139
        flat_input = Flatten()(inputs)
140
        drop_input = Dropout(
141
            rate=float(
142
                self.params['dropout_rate']))(flat_input)
143
        hidden = Dense(int(self.params['nodes']),
144
                       activation="relu")(drop_input)
145
        drop_hidden = Dropout(rate=float(self.params['dropout_rate']))(hidden)
146
        delta = Dense(len(self.project.subjects),
147
                      kernel_initializer='zeros',
148
                      bias_initializer='zeros')(drop_hidden)
149
150
        mean = MeanLayer()(inputs)
151
152
        predictions = Add()([mean, delta])
153
154
        self._model = Model(inputs=inputs, outputs=predictions)
155
        self._model.compile(optimizer=self.params['optimizer'],
156
                            loss='binary_crossentropy',
157
                            metrics=['top_k_categorical_accuracy'])
158
        if 'lr' in self.params:
159
            self._model.optimizer.learning_rate.assign(
160
                float(self.params['lr']))
161
162
        summary = []
163
        self._model.summary(print_fn=summary.append)
164
        self.debug("Created model: \n" + "\n".join(summary))
165
166
    def _train(self, corpus, params):
167
        sources = annif.util.parse_sources(self.params['sources'])
168
        self._create_model(sources)
169
        self._fit_model(corpus, epochs=int(params['epochs']))
170
171
    def _corpus_to_vectors(self, corpus, seq):
172
        # pass corpus through all source projects
173
        sources = [(self.project.registry.get_project(project_id), weight)
174
                   for project_id, weight
175
                   in annif.util.parse_sources(self.params['sources'])]
176
177
        for doc in corpus.documents:
178
            doc_scores = []
179
            for source_project, weight in sources:
180
                hits = source_project.suggest(doc.text)
181
                vector = hits.as_vector(source_project.subjects)
182
                doc_scores.append(np.sqrt(vector) * weight * len(sources))
183
            score_vector = np.array(doc_scores,
184
                                    dtype=np.float32).transpose()
185
            subjects = annif.corpus.SubjectSet((doc.uris, doc.labels))
186
            true_vector = subjects.as_vector(self.project.subjects)
187
            seq.add_sample(score_vector, true_vector)
188
189
    def _open_lmdb(self, cached):
190
        lmdb_path = os.path.join(self.datadir, self.LMDB_FILE)
191
        if not cached and os.path.exists(lmdb_path):
192
            shutil.rmtree(lmdb_path)
193
        return lmdb.open(lmdb_path, map_size=self.LMDB_MAP_SIZE, writemap=True)
194
195
    def _fit_model(self, corpus, epochs):
196
        env = self._open_lmdb(corpus == 'cached')
197
        if corpus != 'cached':
198
            with env.begin(write=True, buffers=True) as txn:
199
                seq = LMDBSequence(txn, batch_size=32)
200
                self._corpus_to_vectors(corpus, seq)
201
        else:
202
            self.info("Reusing cached training data from previous run.")
203
204
        # fit the model using a read-only view of the LMDB
205
        with env.begin(buffers=True) as txn:
206
            seq = LMDBSequence(txn, batch_size=32)
207
            self._model.fit(seq, verbose=True, epochs=epochs)
208
209
        annif.util.atomic_save(
210
            self._model,
211
            self.datadir,
212
            self.MODEL_FILE)
213
214
    def _learn(self, corpus, params):
215
        self.initialize()
216
        self._fit_model(corpus, int(params['learn-epochs']))
217