Passed
Push — master ( 02111c...483f04 )
by Osma
01:55 queued 12s
created

annif.backend.nn_ensemble.MeanLayer.call()   A

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 2
dl 0
loc 2
rs 10
c 0
b 0
f 0
1
"""Neural network based ensemble backend that combines results from multiple
2
projects."""
3
4
5
from io import BytesIO
6
import shutil
7
import os.path
8
import numpy as np
9
from scipy.sparse import csr_matrix, csc_matrix
10
import joblib
11
import lmdb
12
from tensorflow.keras.layers import Input, Dense, Add, Flatten, Dropout, Layer
13
from tensorflow.keras.models import Model, load_model
14
from tensorflow.keras.utils import Sequence
15
import tensorflow.keras.backend as K
16
import annif.corpus
17
import annif.util
18
from annif.exception import NotInitializedException, NotSupportedException
19
from annif.suggestion import VectorSuggestionResult
20
from . import backend
21
from . import ensemble
22
23
24
def idx_to_key(idx):
25
    """convert an integer index to a binary key for use in LMDB"""
26
    return b'%08d' % idx
27
28
29
def key_to_idx(key):
30
    """convert a binary LMDB key to an integer index"""
31
    return int(key)
32
33
34
class LMDBSequence(Sequence):
35
    """A sequence of samples stored in a LMDB database."""
36
37
    def __init__(self, txn, batch_size):
38
        self._txn = txn
39
        cursor = txn.cursor()
40
        if cursor.last():
41
            # Counter holds the number of samples in the database
42
            self._counter = key_to_idx(cursor.key()) + 1
43
        else:  # empty database
44
            self._counter = 0
45
        self._batch_size = batch_size
46
47
    def add_sample(self, inputs, targets):
48
        # use zero-padded 8-digit key
49
        key = idx_to_key(self._counter)
50
        self._counter += 1
51
        # convert the sample into a sparse matrix and serialize it as bytes
52
        sample = (csc_matrix(inputs), csr_matrix(targets))
53
        buf = BytesIO()
54
        joblib.dump(sample, buf)
55
        buf.seek(0)
56
        self._txn.put(key, buf.read())
57
58
    def __getitem__(self, idx):
59
        """get a particular batch of samples"""
60
        cursor = self._txn.cursor()
61
        first_key = idx * self._batch_size
62
        cursor.set_key(idx_to_key(first_key))
63
        input_arrays = []
64
        target_arrays = []
65
        for key, value in cursor.iternext():
66
            if key_to_idx(key) >= (first_key + self._batch_size):
67
                break
68
            input_csr, target_csr = joblib.load(BytesIO(value))
69
            input_arrays.append(input_csr.toarray())
70
            target_arrays.append(target_csr.toarray().flatten())
71
        return np.array(input_arrays), np.array(target_arrays)
72
73
    def __len__(self):
74
        """return the number of available batches"""
75
        return int(np.ceil(self._counter / self._batch_size))
76
77
78
class MeanLayer(Layer):
79
    """Custom Keras layer that calculates mean values along the 2nd axis."""
80
    def call(self, inputs):
81
        return K.mean(inputs, axis=2)
82
83
84
class NNEnsembleBackend(
85
        backend.AnnifLearningBackend,
86
        ensemble.BaseEnsembleBackend):
87
    """Neural network ensemble backend that combines results from multiple
88
    projects"""
89
90
    name = "nn_ensemble"
91
92
    MODEL_FILE = "nn-model.h5"
93
    LMDB_FILE = 'nn-train.mdb'
94
    LMDB_MAP_SIZE = 1024 * 1024 * 1024
95
96
    DEFAULT_PARAMETERS = {
97
        'nodes': 100,
98
        'dropout_rate': 0.2,
99
        'optimizer': 'adam',
100
        'epochs': 10,
101
        'learn-epochs': 1,
102
    }
103
104
    # defaults for uninitialized instances
105
    _model = None
106
107
    def default_params(self):
108
        params = backend.AnnifBackend.DEFAULT_PARAMETERS.copy()
109
        params.update(self.DEFAULT_PARAMETERS)
110
        return params
111
112
    def initialize(self):
113
        super().initialize()
114
        if self._model is not None:
115
            return  # already initialized
116
        model_filename = os.path.join(self.datadir, self.MODEL_FILE)
117
        if not os.path.exists(model_filename):
118
            raise NotInitializedException(
119
                'model file {} not found'.format(model_filename),
120
                backend_id=self.backend_id)
121
        self.debug('loading Keras model from {}'.format(model_filename))
122
        self._model = load_model(model_filename,
123
                                 custom_objects={'MeanLayer': MeanLayer})
124
125
    def _merge_hits_from_sources(self, hits_from_sources, params):
126
        score_vector = np.array([np.sqrt(hits.as_vector(subjects))
127
                                 * weight * len(hits_from_sources)
128
                                 for hits, weight, subjects
129
                                 in hits_from_sources],
130
                                dtype=np.float32)
131
        results = self._model.predict(
132
            np.expand_dims(score_vector.transpose(), 0))
133
        return VectorSuggestionResult(results[0])
134
135
    def _create_model(self, sources):
136
        self.info("creating NN ensemble model")
137
138
        inputs = Input(shape=(len(self.project.subjects), len(sources)))
139
140
        flat_input = Flatten()(inputs)
141
        drop_input = Dropout(
142
            rate=float(
143
                self.params['dropout_rate']))(flat_input)
144
        hidden = Dense(int(self.params['nodes']),
145
                       activation="relu")(drop_input)
146
        drop_hidden = Dropout(rate=float(self.params['dropout_rate']))(hidden)
147
        delta = Dense(len(self.project.subjects),
148
                      kernel_initializer='zeros',
149
                      bias_initializer='zeros')(drop_hidden)
150
151
        mean = MeanLayer()(inputs)
152
153
        predictions = Add()([mean, delta])
154
155
        self._model = Model(inputs=inputs, outputs=predictions)
156
        self._model.compile(optimizer=self.params['optimizer'],
157
                            loss='binary_crossentropy',
158
                            metrics=['top_k_categorical_accuracy'])
159
        if 'lr' in self.params:
160
            self._model.optimizer.learning_rate.assign(
161
                float(self.params['lr']))
162
163
        summary = []
164
        self._model.summary(print_fn=summary.append)
165
        self.debug("Created model: \n" + "\n".join(summary))
166
167
    def _train(self, corpus, params):
168
        sources = annif.util.parse_sources(self.params['sources'])
169
        self._create_model(sources)
170
        self._fit_model(corpus, epochs=int(params['epochs']))
171
172
    def _corpus_to_vectors(self, corpus, seq):
173
        # pass corpus through all source projects
174
        sources = [(self.project.registry.get_project(project_id), weight)
175
                   for project_id, weight
176
                   in annif.util.parse_sources(self.params['sources'])]
177
178
        for doc in corpus.documents:
179
            doc_scores = []
180
            for source_project, weight in sources:
181
                hits = source_project.suggest(doc.text)
182
                vector = hits.as_vector(source_project.subjects)
183
                doc_scores.append(np.sqrt(vector) * weight * len(sources))
184
            score_vector = np.array(doc_scores,
185
                                    dtype=np.float32).transpose()
186
            subjects = annif.corpus.SubjectSet((doc.uris, doc.labels))
187
            true_vector = subjects.as_vector(self.project.subjects)
188
            seq.add_sample(score_vector, true_vector)
189
190
    def _open_lmdb(self, cached):
191
        lmdb_path = os.path.join(self.datadir, self.LMDB_FILE)
192
        if not cached and os.path.exists(lmdb_path):
193
            shutil.rmtree(lmdb_path)
194
        return lmdb.open(lmdb_path, map_size=self.LMDB_MAP_SIZE, writemap=True)
195
196
    def _fit_model(self, corpus, epochs):
197
        env = self._open_lmdb(corpus == 'cached')
198
        if corpus != 'cached':
199
            if corpus.is_empty():
200
                raise NotSupportedException(
201
                    'Cannot train nn_ensemble project with no documents')
202
            with env.begin(write=True, buffers=True) as txn:
203
                seq = LMDBSequence(txn, batch_size=32)
204
                self._corpus_to_vectors(corpus, seq)
205
        else:
206
            self.info("Reusing cached training data from previous run.")
207
        # fit the model using a read-only view of the LMDB
208
        with env.begin(buffers=True) as txn:
209
            seq = LMDBSequence(txn, batch_size=32)
210
            self._model.fit(seq, verbose=True, epochs=epochs)
211
212
        annif.util.atomic_save(
213
            self._model,
214
            self.datadir,
215
            self.MODEL_FILE)
216
217
    def _learn(self, corpus, params):
218
        self.initialize()
219
        self._fit_model(corpus, int(params['learn-epochs']))
220