| Total Complexity | 43 |
| Total Lines | 332 |
| Duplicated Lines | 86.14 % |
| Changes | 0 | ||
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like annif.backend.nn_ensemble often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
| 1 | """Neural network based ensemble backend that combines results from multiple |
||
| 2 | projects.""" |
||
| 3 | |||
| 4 | |||
| 5 | from io import BytesIO |
||
| 6 | import shutil |
||
| 7 | import os.path |
||
| 8 | import numpy as np |
||
| 9 | from scipy.sparse import csr_matrix, csc_matrix |
||
| 10 | import joblib |
||
| 11 | import lmdb |
||
| 12 | from tensorflow.keras.layers import Input, Dense, Add, Flatten, Dropout, Layer |
||
| 13 | from tensorflow.keras.models import Model, load_model |
||
| 14 | from tensorflow.keras.utils import Sequence |
||
| 15 | import tensorflow.keras.backend as K |
||
| 16 | import annif.corpus |
||
| 17 | import annif.parallel |
||
| 18 | import annif.util |
||
| 19 | from annif.exception import NotInitializedException, NotSupportedException |
||
| 20 | from annif.suggestion import VectorSuggestionResult |
||
| 21 | from . import backend |
||
| 22 | from . import ensemble |
||
| 23 | from . import hyperopt |
||
| 24 | |||
| 25 | |||
| 26 | def idx_to_key(idx): |
||
| 27 | """convert an integer index to a binary key for use in LMDB""" |
||
| 28 | return b'%08d' % idx |
||
| 29 | |||
| 30 | |||
| 31 | def key_to_idx(key): |
||
| 32 | """convert a binary LMDB key to an integer index""" |
||
| 33 | return int(key) |
||
| 34 | |||
| 35 | |||
| 36 | View Code Duplication | class LMDBSequence(Sequence): |
|
|
|
|||
| 37 | """A sequence of samples stored in a LMDB database.""" |
||
| 38 | |||
| 39 | def __init__(self, txn, batch_size): |
||
| 40 | self._txn = txn |
||
| 41 | cursor = txn.cursor() |
||
| 42 | if cursor.last(): |
||
| 43 | # Counter holds the number of samples in the database |
||
| 44 | self._counter = key_to_idx(cursor.key()) + 1 |
||
| 45 | else: # empty database |
||
| 46 | self._counter = 0 |
||
| 47 | self._batch_size = batch_size |
||
| 48 | |||
| 49 | def add_sample(self, inputs, targets): |
||
| 50 | # use zero-padded 8-digit key |
||
| 51 | key = idx_to_key(self._counter) |
||
| 52 | self._counter += 1 |
||
| 53 | # convert the sample into a sparse matrix and serialize it as bytes |
||
| 54 | sample = (csc_matrix(inputs), csr_matrix(targets)) |
||
| 55 | buf = BytesIO() |
||
| 56 | joblib.dump(sample, buf) |
||
| 57 | buf.seek(0) |
||
| 58 | self._txn.put(key, buf.read()) |
||
| 59 | |||
| 60 | def __getitem__(self, idx): |
||
| 61 | """get a particular batch of samples""" |
||
| 62 | cursor = self._txn.cursor() |
||
| 63 | first_key = idx * self._batch_size |
||
| 64 | cursor.set_key(idx_to_key(first_key)) |
||
| 65 | input_arrays = [] |
||
| 66 | target_arrays = [] |
||
| 67 | for key, value in cursor.iternext(): |
||
| 68 | if key_to_idx(key) >= (first_key + self._batch_size): |
||
| 69 | break |
||
| 70 | input_csr, target_csr = joblib.load(BytesIO(value)) |
||
| 71 | input_arrays.append(input_csr.toarray()) |
||
| 72 | target_arrays.append(target_csr.toarray().flatten()) |
||
| 73 | return np.array(input_arrays), np.array(target_arrays) |
||
| 74 | |||
| 75 | def __len__(self): |
||
| 76 | """return the number of available batches""" |
||
| 77 | return int(np.ceil(self._counter / self._batch_size)) |
||
| 78 | |||
| 79 | |||
| 80 | View Code Duplication | class NNEnsembleOptimizer(hyperopt.HyperparameterOptimizer): |
|
| 81 | """Hyperparameter optimizer for the NN ensemble backend""" |
||
| 82 | |||
| 83 | def _prepare(self, n_jobs=1): |
||
| 84 | sources = dict( |
||
| 85 | annif.util.parse_sources(self._backend.params['sources'])) |
||
| 86 | |||
| 87 | # initialize the source projects before forking, to save memory |
||
| 88 | for project_id in sources.keys(): |
||
| 89 | project = self._backend.project.registry.get_project(project_id) |
||
| 90 | project.initialize(parallel=True) |
||
| 91 | |||
| 92 | psmap = annif.parallel.ProjectSuggestMap( |
||
| 93 | self._backend.project.registry, |
||
| 94 | list(sources.keys()), |
||
| 95 | backend_params=None, |
||
| 96 | limit=None, |
||
| 97 | threshold=0.0) |
||
| 98 | |||
| 99 | jobs, pool_class = annif.parallel.get_pool(n_jobs) |
||
| 100 | |||
| 101 | self._score_vectors = [] |
||
| 102 | self._gold_subjects = [] |
||
| 103 | |||
| 104 | with pool_class(jobs) as pool: |
||
| 105 | for hits, uris, labels in pool.imap_unordered( |
||
| 106 | psmap.suggest, self._corpus.documents): |
||
| 107 | doc_scores = [] |
||
| 108 | for project_id, p_hits in hits.items(): |
||
| 109 | vector = p_hits.as_vector(self._backend.project.subjects) |
||
| 110 | doc_scores.append(np.sqrt(vector) |
||
| 111 | * sources[project_id] |
||
| 112 | * len(sources)) |
||
| 113 | score_vector = np.array(doc_scores, |
||
| 114 | dtype=np.float32).transpose() |
||
| 115 | subjects = annif.corpus.SubjectSet((uris, labels)) |
||
| 116 | self._score_vectors.append(score_vector) |
||
| 117 | self._gold_subjects.append(subjects) |
||
| 118 | |||
| 119 | def _objective(self, trial): |
||
| 120 | sources = annif.util.parse_sources(self._backend.params['sources']) |
||
| 121 | params = { |
||
| 122 | 'nodes': trial.suggest_int('nodes', 50, 200), |
||
| 123 | 'dropout_rate': trial.suggest_float('dropout_rate', 0.0, 0.5), |
||
| 124 | 'epochs': trial.suggest_int('epochs', 5, 20), |
||
| 125 | 'optimizer': 'adam' |
||
| 126 | } |
||
| 127 | model = self._backend._create_model(sources, params) |
||
| 128 | |||
| 129 | env = self._backend._open_lmdb(True, |
||
| 130 | self._backend.params['lmdb_map_size']) |
||
| 131 | with env.begin(buffers=True) as txn: |
||
| 132 | seq = LMDBSequence(txn, batch_size=32) |
||
| 133 | model.fit(seq, verbose=0, epochs=params['epochs']) |
||
| 134 | |||
| 135 | batch = annif.eval.EvaluationBatch(self._backend.project.subjects) |
||
| 136 | for goldsubj, score_vector in zip(self._gold_subjects, |
||
| 137 | self._score_vectors): |
||
| 138 | |||
| 139 | results = model.predict( |
||
| 140 | np.expand_dims(score_vector, 0)) |
||
| 141 | output = VectorSuggestionResult(results[0]) |
||
| 142 | batch.evaluate(output, goldsubj) |
||
| 143 | eval_results = batch.results(metrics=[self._metric]) |
||
| 144 | return eval_results[self._metric] |
||
| 145 | |||
| 146 | def _postprocess(self, study): |
||
| 147 | bp = study.best_params |
||
| 148 | lines = [ |
||
| 149 | f"nodes={bp['nodes']}", |
||
| 150 | f"dropout_rate={bp['dropout_rate']}", |
||
| 151 | f"epochs={bp['epochs']}" |
||
| 152 | ] |
||
| 153 | return hyperopt.HPRecommendation(lines=lines, score=study.best_value) |
||
| 154 | |||
| 155 | |||
| 156 | class MeanLayer(Layer): |
||
| 157 | """Custom Keras layer that calculates mean values along the 2nd axis.""" |
||
| 158 | def call(self, inputs): |
||
| 159 | return K.mean(inputs, axis=2) |
||
| 160 | |||
| 161 | |||
| 162 | View Code Duplication | class NNEnsembleBackend( |
|
| 163 | backend.AnnifLearningBackend, |
||
| 164 | ensemble.BaseEnsembleBackend, |
||
| 165 | hyperopt.AnnifHyperoptBackend): |
||
| 166 | """Neural network ensemble backend that combines results from multiple |
||
| 167 | projects""" |
||
| 168 | |||
| 169 | name = "nn_ensemble" |
||
| 170 | |||
| 171 | MODEL_FILE = "nn-model.h5" |
||
| 172 | LMDB_FILE = 'nn-train.mdb' |
||
| 173 | |||
| 174 | DEFAULT_PARAMETERS = { |
||
| 175 | 'nodes': 100, |
||
| 176 | 'dropout_rate': 0.2, |
||
| 177 | 'optimizer': 'adam', |
||
| 178 | 'epochs': 10, |
||
| 179 | 'learn-epochs': 1, |
||
| 180 | 'lmdb_map_size': 1024 * 1024 * 1024 |
||
| 181 | } |
||
| 182 | |||
| 183 | # defaults for uninitialized instances |
||
| 184 | _model = None |
||
| 185 | |||
| 186 | def get_hp_optimizer(self, corpus, metric): |
||
| 187 | return NNEnsembleOptimizer(self, corpus, metric) |
||
| 188 | |||
| 189 | def default_params(self): |
||
| 190 | params = backend.AnnifBackend.DEFAULT_PARAMETERS.copy() |
||
| 191 | params.update(self.DEFAULT_PARAMETERS) |
||
| 192 | return params |
||
| 193 | |||
| 194 | def initialize(self, parallel=False): |
||
| 195 | super().initialize(parallel) |
||
| 196 | if self._model is not None: |
||
| 197 | return # already initialized |
||
| 198 | if parallel: |
||
| 199 | # Don't load TF model just before parallel execution, |
||
| 200 | # since it won't work after forking worker processes |
||
| 201 | return |
||
| 202 | model_filename = os.path.join(self.datadir, self.MODEL_FILE) |
||
| 203 | if not os.path.exists(model_filename): |
||
| 204 | raise NotInitializedException( |
||
| 205 | 'model file {} not found'.format(model_filename), |
||
| 206 | backend_id=self.backend_id) |
||
| 207 | self.debug('loading Keras model from {}'.format(model_filename)) |
||
| 208 | self._model = load_model(model_filename, |
||
| 209 | custom_objects={'MeanLayer': MeanLayer}) |
||
| 210 | |||
| 211 | def _merge_hits_from_sources(self, hits_from_sources, params): |
||
| 212 | score_vector = np.array([np.sqrt(hits.as_vector(subjects)) |
||
| 213 | * weight * len(hits_from_sources) |
||
| 214 | for hits, weight, subjects |
||
| 215 | in hits_from_sources], |
||
| 216 | dtype=np.float32) |
||
| 217 | results = self._model.predict( |
||
| 218 | np.expand_dims(score_vector.transpose(), 0)) |
||
| 219 | return VectorSuggestionResult(results[0]) |
||
| 220 | |||
| 221 | def _create_model(self, sources, params): |
||
| 222 | inputs = Input(shape=(len(self.project.subjects), len(sources))) |
||
| 223 | |||
| 224 | flat_input = Flatten()(inputs) |
||
| 225 | drop_input = Dropout( |
||
| 226 | rate=float(params['dropout_rate']))(flat_input) |
||
| 227 | hidden = Dense(int(params['nodes']), |
||
| 228 | activation="relu")(drop_input) |
||
| 229 | drop_hidden = Dropout(rate=float(params['dropout_rate']))(hidden) |
||
| 230 | delta = Dense(len(self.project.subjects), |
||
| 231 | kernel_initializer='zeros', |
||
| 232 | bias_initializer='zeros')(drop_hidden) |
||
| 233 | |||
| 234 | mean = MeanLayer()(inputs) |
||
| 235 | |||
| 236 | predictions = Add()([mean, delta]) |
||
| 237 | |||
| 238 | model = Model(inputs=inputs, outputs=predictions) |
||
| 239 | model.compile(optimizer=params['optimizer'], |
||
| 240 | loss='binary_crossentropy', |
||
| 241 | metrics=['top_k_categorical_accuracy']) |
||
| 242 | if 'lr' in params: |
||
| 243 | model.optimizer.learning_rate.assign( |
||
| 244 | float(params['lr'])) |
||
| 245 | |||
| 246 | summary = [] |
||
| 247 | model.summary(print_fn=summary.append) |
||
| 248 | self.debug("Created model: \n" + "\n".join(summary)) |
||
| 249 | return model |
||
| 250 | |||
| 251 | def _train(self, corpus, params, jobs=0): |
||
| 252 | sources = annif.util.parse_sources(params['sources']) |
||
| 253 | self.info("creating NN ensemble model") |
||
| 254 | self._model = self._create_model(sources, params) |
||
| 255 | self._fit_model( |
||
| 256 | corpus, |
||
| 257 | epochs=int(params['epochs']), |
||
| 258 | lmdb_map_size=int(params['lmdb_map_size']), |
||
| 259 | n_jobs=jobs) |
||
| 260 | |||
| 261 | def _corpus_to_vectors(self, corpus, seq, n_jobs): |
||
| 262 | # pass corpus through all source projects |
||
| 263 | sources = dict( |
||
| 264 | annif.util.parse_sources(self.params['sources'])) |
||
| 265 | |||
| 266 | # initialize the source projects before forking, to save memory |
||
| 267 | self.info( |
||
| 268 | f"Initializing source projects: {', '.join(sources.keys())}") |
||
| 269 | for project_id in sources.keys(): |
||
| 270 | project = self.project.registry.get_project(project_id) |
||
| 271 | project.initialize(parallel=True) |
||
| 272 | |||
| 273 | psmap = annif.parallel.ProjectSuggestMap( |
||
| 274 | self.project.registry, |
||
| 275 | list(sources.keys()), |
||
| 276 | backend_params=None, |
||
| 277 | limit=None, |
||
| 278 | threshold=0.0) |
||
| 279 | |||
| 280 | jobs, pool_class = annif.parallel.get_pool(n_jobs) |
||
| 281 | |||
| 282 | self.info("Processing training documents...") |
||
| 283 | with pool_class(jobs) as pool: |
||
| 284 | for hits, uris, labels in pool.imap_unordered( |
||
| 285 | psmap.suggest, corpus.documents): |
||
| 286 | doc_scores = [] |
||
| 287 | for project_id, p_hits in hits.items(): |
||
| 288 | vector = p_hits.as_vector(self.project.subjects) |
||
| 289 | doc_scores.append(np.sqrt(vector) |
||
| 290 | * sources[project_id] |
||
| 291 | * len(sources)) |
||
| 292 | score_vector = np.array(doc_scores, |
||
| 293 | dtype=np.float32).transpose() |
||
| 294 | subjects = annif.corpus.SubjectSet((uris, labels)) |
||
| 295 | true_vector = subjects.as_vector(self.project.subjects) |
||
| 296 | seq.add_sample(score_vector, true_vector) |
||
| 297 | |||
| 298 | def _open_lmdb(self, cached, lmdb_map_size): |
||
| 299 | lmdb_path = os.path.join(self.datadir, self.LMDB_FILE) |
||
| 300 | if not cached and os.path.exists(lmdb_path): |
||
| 301 | shutil.rmtree(lmdb_path) |
||
| 302 | return lmdb.open(lmdb_path, map_size=lmdb_map_size, writemap=True) |
||
| 303 | |||
| 304 | def _fit_model(self, corpus, epochs, lmdb_map_size, n_jobs=1): |
||
| 305 | env = self._open_lmdb(corpus == 'cached', lmdb_map_size) |
||
| 306 | if corpus != 'cached': |
||
| 307 | if corpus.is_empty(): |
||
| 308 | raise NotSupportedException( |
||
| 309 | 'Cannot train nn_ensemble project with no documents') |
||
| 310 | with env.begin(write=True, buffers=True) as txn: |
||
| 311 | seq = LMDBSequence(txn, batch_size=32) |
||
| 312 | self._corpus_to_vectors(corpus, seq, n_jobs) |
||
| 313 | else: |
||
| 314 | self.info("Reusing cached training data from previous run.") |
||
| 315 | # fit the model using a read-only view of the LMDB |
||
| 316 | self.info("Training neural network model...") |
||
| 317 | with env.begin(buffers=True) as txn: |
||
| 318 | seq = LMDBSequence(txn, batch_size=32) |
||
| 319 | self._model.fit(seq, verbose=1, epochs=epochs) |
||
| 320 | |||
| 321 | annif.util.atomic_save( |
||
| 322 | self._model, |
||
| 323 | self.datadir, |
||
| 324 | self.MODEL_FILE) |
||
| 325 | |||
| 326 | def _learn(self, corpus, params): |
||
| 327 | self.initialize() |
||
| 328 | self._fit_model( |
||
| 329 | corpus, |
||
| 330 | int(params['learn-epochs']), |
||
| 331 | int(params['lmdb_map_size'])) |
||
| 332 |