Passed
Pull Request — master (#414)
by Osma
01:45
created

annif.backend.nn_ensemble   A

Complexity

Total Complexity 27

Size/Duplication

Total Lines 205
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 27
eloc 153
dl 0
loc 205
rs 10
c 0
b 0
f 0

13 Methods

Rating   Name   Duplication   Size   Complexity  
A LMDBSequence.__len__() 0 3 1
A LMDBSequence.__getitem__() 0 14 3
A NNEnsembleBackend.default_params() 0 5 1
A LMDBSequence.add_sample() 0 10 1
A LMDBSequence.__init__() 0 8 2
A NNEnsembleBackend._create_model() 0 28 2
A NNEnsembleBackend._corpus_to_vectors() 0 17 3
A NNEnsembleBackend.initialize() 0 11 3
A NNEnsembleBackend._open_lmdb() 0 5 3
A NNEnsembleBackend._merge_hits_from_sources() 0 8 1
A NNEnsembleBackend._learn() 0 3 1
A NNEnsembleBackend._train() 0 4 1
A NNEnsembleBackend._fit_model() 0 16 3

2 Functions

Rating   Name   Duplication   Size   Complexity  
A key_to_idx() 0 3 1
A idx_to_key() 0 3 1
1
"""Neural network based ensemble backend that combines results from multiple
2
projects."""
3
4
5
from io import BytesIO
6
import shutil
7
import os.path
8
import numpy as np
9
from scipy.sparse import csr_matrix, csc_matrix
10
import joblib
11
import lmdb
12
from tensorflow.keras.layers import Input, Dense, Add, Flatten, Lambda, Dropout
13
from tensorflow.keras.models import Model, load_model
14
from tensorflow.keras.utils import Sequence
15
import tensorflow.keras.backend as K
16
import annif.corpus
17
import annif.util
18
from annif.exception import NotInitializedException
19
from annif.suggestion import VectorSuggestionResult
20
from . import backend
21
from . import ensemble
22
23
24
def idx_to_key(idx):
25
    """convert an integer index to a binary key for use in LMDB"""
26
    return b'%08d' % idx
27
28
29
def key_to_idx(key):
30
    """convert a binary LMDB key to an integer index"""
31
    return int(key)
32
33
34
class LMDBSequence(Sequence):
35
    """A sequence of samples stored in a LMDB database."""
36
37
    def __init__(self, txn, batch_size):
38
        self._txn = txn
39
        cursor = txn.cursor()
40
        if cursor.last():
41
            self._counter = key_to_idx(cursor.key())
42
        else:  # empty database
43
            self._counter = 0
44
        self._batch_size = batch_size
45
46
    def add_sample(self, inputs, targets):
47
        # use zero-padded 8-digit key
48
        key = idx_to_key(self._counter)
49
        self._counter += 1
50
        # convert the sample into a sparse matrix and serialize it as bytes
51
        sample = (csc_matrix(inputs), csr_matrix(targets))
52
        buf = BytesIO()
53
        joblib.dump(sample, buf)
54
        buf.seek(0)
55
        self._txn.put(key, buf.read())
56
57
    def __getitem__(self, idx):
58
        """get a particular batch of samples"""
59
        cursor = self._txn.cursor()
60
        first_key = idx * self._batch_size
61
        cursor.set_key(idx_to_key(first_key))
62
        input_arrays = []
63
        target_arrays = []
64
        for key, value in cursor.iternext():
65
            if key_to_idx(key) >= (first_key + self._batch_size):
66
                break
67
            input_csr, target_csr = joblib.load(BytesIO(value))
68
            input_arrays.append(input_csr.toarray())
69
            target_arrays.append(target_csr.toarray().flatten())
70
        return np.array(input_arrays), np.array(target_arrays)
71
72
    def __len__(self):
73
        """return the number of available batches"""
74
        return int(np.ceil(self._counter / self._batch_size))
75
76
77
class NNEnsembleBackend(
78
        backend.AnnifLearningBackend,
79
        ensemble.BaseEnsembleBackend):
80
    """Neural network ensemble backend that combines results from multiple
81
    projects"""
82
83
    name = "nn_ensemble"
84
85
    MODEL_FILE = "nn-model.h5"
86
    LMDB_FILE = 'nn-train.mdb'
87
    LMDB_MAP_SIZE = 1024 * 1024 * 1024
88
89
    DEFAULT_PARAMETERS = {
90
        'nodes': 100,
91
        'dropout_rate': 0.2,
92
        'optimizer': 'adam',
93
        'epochs': 10,
94
        'learn-epochs': 1,
95
    }
96
97
    # defaults for uninitialized instances
98
    _model = None
99
100
    def default_params(self):
101
        params = {}
102
        params.update(super().default_params())
103
        params.update(self.DEFAULT_PARAMETERS)
104
        return params
105
106
    def initialize(self):
107
        super().initialize()
108
        if self._model is not None:
109
            return  # already initialized
110
        model_filename = os.path.join(self.datadir, self.MODEL_FILE)
111
        if not os.path.exists(model_filename):
112
            raise NotInitializedException(
113
                'model file {} not found'.format(model_filename),
114
                backend_id=self.backend_id)
115
        self.debug('loading Keras model from {}'.format(model_filename))
116
        self._model = load_model(model_filename)
117
118
    def _merge_hits_from_sources(self, hits_from_sources, params):
119
        score_vector = np.array([hits.as_vector(subjects) * weight
120
                                 for hits, weight, subjects
121
                                 in hits_from_sources],
122
                                dtype=np.float32)
123
        results = self._model.predict(
124
            np.expand_dims(score_vector.transpose(), 0))
125
        return VectorSuggestionResult(results[0])
126
127
    def _create_model(self, sources):
128
        self.info("creating NN ensemble model")
129
130
        inputs = Input(shape=(len(self.project.subjects), len(sources)))
131
132
        flat_input = Flatten()(inputs)
133
        drop_input = Dropout(
134
            rate=float(
135
                self.params['dropout_rate']))(flat_input)
136
        hidden = Dense(int(self.params['nodes']),
137
                       activation="relu")(drop_input)
138
        drop_hidden = Dropout(rate=float(self.params['dropout_rate']))(hidden)
139
        delta = Dense(len(self.project.subjects),
140
                      kernel_initializer='zeros',
141
                      bias_initializer='zeros')(drop_hidden)
142
143
        mean = Lambda(lambda x: K.mean(x, axis=2))(inputs)
144
145
        predictions = Add()([mean, delta])
146
147
        self._model = Model(inputs=inputs, outputs=predictions)
148
        self._model.compile(optimizer=self.params['optimizer'],
149
                            loss='binary_crossentropy',
150
                            metrics=['top_k_categorical_accuracy'])
151
152
        summary = []
153
        self._model.summary(print_fn=summary.append)
154
        self.debug("Created model: \n" + "\n".join(summary))
155
156
    def _train(self, corpus, params):
157
        sources = annif.util.parse_sources(self.params['sources'])
158
        self._create_model(sources)
159
        self._fit_model(corpus, epochs=int(params['epochs']))
160
161
    def _corpus_to_vectors(self, corpus, seq):
162
        # pass corpus through all source projects
163
        sources = [(self.project.registry.get_project(project_id), weight)
164
                   for project_id, weight
165
                   in annif.util.parse_sources(self.params['sources'])]
166
167
        for doc in corpus.documents:
168
            doc_scores = []
169
            for source_project, weight in sources:
170
                hits = source_project.suggest(doc.text)
171
                doc_scores.append(
172
                    hits.as_vector(source_project.subjects) * weight)
173
            score_vector = np.array(doc_scores,
174
                                    dtype=np.float32).transpose()
175
            subjects = annif.corpus.SubjectSet((doc.uris, doc.labels))
176
            true_vector = subjects.as_vector(self.project.subjects)
177
            seq.add_sample(score_vector, true_vector)
178
179
    def _open_lmdb(self, cached):
180
        lmdb_path = os.path.join(self.datadir, self.LMDB_FILE)
181
        if not cached and os.path.exists(lmdb_path):
182
            shutil.rmtree(lmdb_path)
183
        return lmdb.open(lmdb_path, map_size=self.LMDB_MAP_SIZE, writemap=True)
184
185
    def _fit_model(self, corpus, epochs):
186
        env = self._open_lmdb(corpus == 'cached')
187
        with env.begin(write=True, buffers=True) as txn:
188
            seq = LMDBSequence(txn, batch_size=32)
189
            if corpus != 'cached':
190
                self._corpus_to_vectors(corpus, seq)
191
            else:
192
                self.info("Reusing cached training data from previous run.")
193
194
            # fit the model
195
            self._model.fit(seq, verbose=True, epochs=epochs)
196
197
        annif.util.atomic_save(
198
            self._model,
199
            self.datadir,
200
            self.MODEL_FILE)
201
202
    def _learn(self, corpus, params):
203
        self.initialize()
204
        self._fit_model(corpus, int(params['learn-epochs']))
205