Passed
Pull Request — master (#569)
by Osma
03:52
created

NNEnsembleBackend.default_params()   A

Complexity

Conditions 1

Size

Total Lines 4
Code Lines 4

Duplication

Lines 4
Ratio 100 %

Importance

Changes 0
Metric Value
cc 1
eloc 4
nop 1
dl 4
loc 4
rs 10
c 0
b 0
f 0
1
"""Neural network based ensemble backend that combines results from multiple
2
projects."""
3
4
5
from io import BytesIO
6
import shutil
7
import os.path
8
import numpy as np
9
from scipy.sparse import csr_matrix, csc_matrix
10
import joblib
11
import lmdb
12
from tensorflow.keras.layers import Input, Dense, Add, Flatten, Dropout, Layer
13
from tensorflow.keras.models import Model, load_model
14
from tensorflow.keras.utils import Sequence
15
import tensorflow.keras.backend as K
16
import annif.corpus
17
import annif.parallel
18
import annif.util
19
from annif.exception import NotInitializedException, NotSupportedException
20
from annif.suggestion import VectorSuggestionResult
21
from . import backend
22
from . import ensemble
23
from . import hyperopt
24
25
26
def idx_to_key(idx):
27
    """convert an integer index to a binary key for use in LMDB"""
28
    return b'%08d' % idx
29
30
31
def key_to_idx(key):
32
    """convert a binary LMDB key to an integer index"""
33
    return int(key)
34
35
36 View Code Duplication
class LMDBSequence(Sequence):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
37
    """A sequence of samples stored in a LMDB database."""
38
39
    def __init__(self, txn, batch_size):
40
        self._txn = txn
41
        cursor = txn.cursor()
42
        if cursor.last():
43
            # Counter holds the number of samples in the database
44
            self._counter = key_to_idx(cursor.key()) + 1
45
        else:  # empty database
46
            self._counter = 0
47
        self._batch_size = batch_size
48
49
    def add_sample(self, inputs, targets):
50
        # use zero-padded 8-digit key
51
        key = idx_to_key(self._counter)
52
        self._counter += 1
53
        # convert the sample into a sparse matrix and serialize it as bytes
54
        sample = (csc_matrix(inputs), csr_matrix(targets))
55
        buf = BytesIO()
56
        joblib.dump(sample, buf)
57
        buf.seek(0)
58
        self._txn.put(key, buf.read())
59
60
    def __getitem__(self, idx):
61
        """get a particular batch of samples"""
62
        cursor = self._txn.cursor()
63
        first_key = idx * self._batch_size
64
        cursor.set_key(idx_to_key(first_key))
65
        input_arrays = []
66
        target_arrays = []
67
        for key, value in cursor.iternext():
68
            if key_to_idx(key) >= (first_key + self._batch_size):
69
                break
70
            input_csr, target_csr = joblib.load(BytesIO(value))
71
            input_arrays.append(input_csr.toarray())
72
            target_arrays.append(target_csr.toarray().flatten())
73
        return np.array(input_arrays), np.array(target_arrays)
74
75
    def __len__(self):
76
        """return the number of available batches"""
77
        return int(np.ceil(self._counter / self._batch_size))
78
79
80 View Code Duplication
class NNEnsembleOptimizer(hyperopt.HyperparameterOptimizer):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
81
    """Hyperparameter optimizer for the NN ensemble backend"""
82
83
    def _prepare(self, n_jobs=1):
84
        sources = dict(
85
            annif.util.parse_sources(self._backend.params['sources']))
86
87
        # initialize the source projects before forking, to save memory
88
        for project_id in sources.keys():
89
            project = self._backend.project.registry.get_project(project_id)
90
            project.initialize(parallel=True)
91
92
        psmap = annif.parallel.ProjectSuggestMap(
93
            self._backend.project.registry,
94
            list(sources.keys()),
95
            backend_params=None,
96
            limit=None,
97
            threshold=0.0)
98
99
        jobs, pool_class = annif.parallel.get_pool(n_jobs)
100
101
        self._score_vectors = []
102
        self._gold_subjects = []
103
104
        with pool_class(jobs) as pool:
105
            for hits, uris, labels in pool.imap_unordered(
106
                    psmap.suggest, self._corpus.documents):
107
                doc_scores = []
108
                for project_id, p_hits in hits.items():
109
                    vector = p_hits.as_vector(self._backend.project.subjects)
110
                    doc_scores.append(np.sqrt(vector)
111
                                      * sources[project_id]
112
                                      * len(sources))
113
                score_vector = np.array(doc_scores,
114
                                        dtype=np.float32).transpose()
115
                subjects = annif.corpus.SubjectSet((uris, labels))
116
                self._score_vectors.append(score_vector)
117
                self._gold_subjects.append(subjects)
118
119
    def _objective(self, trial):
120
        sources = annif.util.parse_sources(self._backend.params['sources'])
121
        params = {
122
            'nodes': trial.suggest_int('nodes', 50, 200),
123
            'dropout_rate': trial.suggest_float('dropout_rate', 0.0, 0.5),
124
            'epochs': trial.suggest_int('epochs', 5, 20),
125
            'optimizer': 'adam'
126
        }
127
        model = self._backend._create_model(sources, params)
128
129
        env = self._backend._open_lmdb(True,
130
                                       self._backend.params['lmdb_map_size'])
131
        with env.begin(buffers=True) as txn:
132
            seq = LMDBSequence(txn, batch_size=32)
133
            model.fit(seq, verbose=0, epochs=params['epochs'])
134
135
        batch = annif.eval.EvaluationBatch(self._backend.project.subjects)
136
        for goldsubj, score_vector in zip(self._gold_subjects,
137
                                          self._score_vectors):
138
139
            results = model.predict(
140
                np.expand_dims(score_vector, 0))
141
            output = VectorSuggestionResult(results[0])
142
            batch.evaluate(output, goldsubj)
143
        eval_results = batch.results(metrics=[self._metric])
144
        return eval_results[self._metric]
145
146
    def _postprocess(self, study):
147
        bp = study.best_params
148
        lines = [
149
            f"nodes={bp['nodes']}",
150
            f"dropout_rate={bp['dropout_rate']}",
151
            f"epochs={bp['epochs']}"
152
        ]
153
        return hyperopt.HPRecommendation(lines=lines, score=study.best_value)
154
155
156
class MeanLayer(Layer):
157
    """Custom Keras layer that calculates mean values along the 2nd axis."""
158
    def call(self, inputs):
159
        return K.mean(inputs, axis=2)
160
161
162 View Code Duplication
class NNEnsembleBackend(
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
163
        backend.AnnifLearningBackend,
164
        ensemble.BaseEnsembleBackend,
165
        hyperopt.AnnifHyperoptBackend):
166
    """Neural network ensemble backend that combines results from multiple
167
    projects"""
168
169
    name = "nn_ensemble"
170
171
    MODEL_FILE = "nn-model.h5"
172
    LMDB_FILE = 'nn-train.mdb'
173
174
    DEFAULT_PARAMETERS = {
175
        'nodes': 100,
176
        'dropout_rate': 0.2,
177
        'optimizer': 'adam',
178
        'epochs': 10,
179
        'learn-epochs': 1,
180
        'lmdb_map_size': 1024 * 1024 * 1024
181
    }
182
183
    # defaults for uninitialized instances
184
    _model = None
185
186
    def get_hp_optimizer(self, corpus, metric):
187
        return NNEnsembleOptimizer(self, corpus, metric)
188
189
    def default_params(self):
190
        params = backend.AnnifBackend.DEFAULT_PARAMETERS.copy()
191
        params.update(self.DEFAULT_PARAMETERS)
192
        return params
193
194
    def initialize(self, parallel=False):
195
        super().initialize(parallel)
196
        if self._model is not None:
197
            return  # already initialized
198
        if parallel:
199
            # Don't load TF model just before parallel execution,
200
            # since it won't work after forking worker processes
201
            return
202
        model_filename = os.path.join(self.datadir, self.MODEL_FILE)
203
        if not os.path.exists(model_filename):
204
            raise NotInitializedException(
205
                'model file {} not found'.format(model_filename),
206
                backend_id=self.backend_id)
207
        self.debug('loading Keras model from {}'.format(model_filename))
208
        self._model = load_model(model_filename,
209
                                 custom_objects={'MeanLayer': MeanLayer})
210
211
    def _merge_hits_from_sources(self, hits_from_sources, params):
212
        score_vector = np.array([np.sqrt(hits.as_vector(subjects))
213
                                 * weight * len(hits_from_sources)
214
                                 for hits, weight, subjects
215
                                 in hits_from_sources],
216
                                dtype=np.float32)
217
        results = self._model.predict(
218
            np.expand_dims(score_vector.transpose(), 0))
219
        return VectorSuggestionResult(results[0])
220
221
    def _create_model(self, sources, params):
222
        inputs = Input(shape=(len(self.project.subjects), len(sources)))
223
224
        flat_input = Flatten()(inputs)
225
        drop_input = Dropout(
226
            rate=float(params['dropout_rate']))(flat_input)
227
        hidden = Dense(int(params['nodes']),
228
                       activation="relu")(drop_input)
229
        drop_hidden = Dropout(rate=float(params['dropout_rate']))(hidden)
230
        delta = Dense(len(self.project.subjects),
231
                      kernel_initializer='zeros',
232
                      bias_initializer='zeros')(drop_hidden)
233
234
        mean = MeanLayer()(inputs)
235
236
        predictions = Add()([mean, delta])
237
238
        model = Model(inputs=inputs, outputs=predictions)
239
        model.compile(optimizer=params['optimizer'],
240
                      loss='binary_crossentropy',
241
                      metrics=['top_k_categorical_accuracy'])
242
        if 'lr' in params:
243
            model.optimizer.learning_rate.assign(
244
                float(params['lr']))
245
246
        summary = []
247
        model.summary(print_fn=summary.append)
248
        self.debug("Created model: \n" + "\n".join(summary))
249
        return model
250
251
    def _train(self, corpus, params, jobs=0):
252
        sources = annif.util.parse_sources(params['sources'])
253
        self.info("creating NN ensemble model")
254
        self._model = self._create_model(sources, params)
255
        self._fit_model(
256
            corpus,
257
            epochs=int(params['epochs']),
258
            lmdb_map_size=int(params['lmdb_map_size']),
259
            n_jobs=jobs)
260
261
    def _corpus_to_vectors(self, corpus, seq, n_jobs):
262
        # pass corpus through all source projects
263
        sources = dict(
264
            annif.util.parse_sources(self.params['sources']))
265
266
        # initialize the source projects before forking, to save memory
267
        self.info(
268
            f"Initializing source projects: {', '.join(sources.keys())}")
269
        for project_id in sources.keys():
270
            project = self.project.registry.get_project(project_id)
271
            project.initialize(parallel=True)
272
273
        psmap = annif.parallel.ProjectSuggestMap(
274
            self.project.registry,
275
            list(sources.keys()),
276
            backend_params=None,
277
            limit=None,
278
            threshold=0.0)
279
280
        jobs, pool_class = annif.parallel.get_pool(n_jobs)
281
282
        self.info("Processing training documents...")
283
        with pool_class(jobs) as pool:
284
            for hits, uris, labels in pool.imap_unordered(
285
                    psmap.suggest, corpus.documents):
286
                doc_scores = []
287
                for project_id, p_hits in hits.items():
288
                    vector = p_hits.as_vector(self.project.subjects)
289
                    doc_scores.append(np.sqrt(vector)
290
                                      * sources[project_id]
291
                                      * len(sources))
292
                score_vector = np.array(doc_scores,
293
                                        dtype=np.float32).transpose()
294
                subjects = annif.corpus.SubjectSet((uris, labels))
295
                true_vector = subjects.as_vector(self.project.subjects)
296
                seq.add_sample(score_vector, true_vector)
297
298
    def _open_lmdb(self, cached, lmdb_map_size):
299
        lmdb_path = os.path.join(self.datadir, self.LMDB_FILE)
300
        if not cached and os.path.exists(lmdb_path):
301
            shutil.rmtree(lmdb_path)
302
        return lmdb.open(lmdb_path, map_size=lmdb_map_size, writemap=True)
303
304
    def _fit_model(self, corpus, epochs, lmdb_map_size, n_jobs=1):
305
        env = self._open_lmdb(corpus == 'cached', lmdb_map_size)
306
        if corpus != 'cached':
307
            if corpus.is_empty():
308
                raise NotSupportedException(
309
                    'Cannot train nn_ensemble project with no documents')
310
            with env.begin(write=True, buffers=True) as txn:
311
                seq = LMDBSequence(txn, batch_size=32)
312
                self._corpus_to_vectors(corpus, seq, n_jobs)
313
        else:
314
            self.info("Reusing cached training data from previous run.")
315
        # fit the model using a read-only view of the LMDB
316
        self.info("Training neural network model...")
317
        with env.begin(buffers=True) as txn:
318
            seq = LMDBSequence(txn, batch_size=32)
319
            self._model.fit(seq, verbose=1, epochs=epochs)
320
321
        annif.util.atomic_save(
322
            self._model,
323
            self.datadir,
324
            self.MODEL_FILE)
325
326
    def _learn(self, corpus, params):
327
        self.initialize()
328
        self._fit_model(
329
            corpus,
330
            int(params['learn-epochs']),
331
            int(params['lmdb_map_size']))
332