| Total Complexity | 43 | 
| Total Lines | 332 | 
| Duplicated Lines | 86.14 % | 
| Changes | 0 | ||
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like annif.backend.nn_ensemble often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
| 1 | """Neural network based ensemble backend that combines results from multiple | ||
| 2 | projects.""" | ||
| 3 | |||
| 4 | |||
| 5 | from io import BytesIO | ||
| 6 | import shutil | ||
| 7 | import os.path | ||
| 8 | import numpy as np | ||
| 9 | from scipy.sparse import csr_matrix, csc_matrix | ||
| 10 | import joblib | ||
| 11 | import lmdb | ||
| 12 | from tensorflow.keras.layers import Input, Dense, Add, Flatten, Dropout, Layer | ||
| 13 | from tensorflow.keras.models import Model, load_model | ||
| 14 | from tensorflow.keras.utils import Sequence | ||
| 15 | import tensorflow.keras.backend as K | ||
| 16 | import annif.corpus | ||
| 17 | import annif.parallel | ||
| 18 | import annif.util | ||
| 19 | from annif.exception import NotInitializedException, NotSupportedException | ||
| 20 | from annif.suggestion import VectorSuggestionResult | ||
| 21 | from . import backend | ||
| 22 | from . import ensemble | ||
| 23 | from . import hyperopt | ||
| 24 | |||
| 25 | |||
| 26 | def idx_to_key(idx): | ||
| 27 | """convert an integer index to a binary key for use in LMDB""" | ||
| 28 | return b'%08d' % idx | ||
| 29 | |||
| 30 | |||
| 31 | def key_to_idx(key): | ||
| 32 | """convert a binary LMDB key to an integer index""" | ||
| 33 | return int(key) | ||
| 34 | |||
| 35 | |||
| 36 | View Code Duplication | class LMDBSequence(Sequence): | |
|  | |||
| 37 | """A sequence of samples stored in a LMDB database.""" | ||
| 38 | |||
| 39 | def __init__(self, txn, batch_size): | ||
| 40 | self._txn = txn | ||
| 41 | cursor = txn.cursor() | ||
| 42 | if cursor.last(): | ||
| 43 | # Counter holds the number of samples in the database | ||
| 44 | self._counter = key_to_idx(cursor.key()) + 1 | ||
| 45 | else: # empty database | ||
| 46 | self._counter = 0 | ||
| 47 | self._batch_size = batch_size | ||
| 48 | |||
| 49 | def add_sample(self, inputs, targets): | ||
| 50 | # use zero-padded 8-digit key | ||
| 51 | key = idx_to_key(self._counter) | ||
| 52 | self._counter += 1 | ||
| 53 | # convert the sample into a sparse matrix and serialize it as bytes | ||
| 54 | sample = (csc_matrix(inputs), csr_matrix(targets)) | ||
| 55 | buf = BytesIO() | ||
| 56 | joblib.dump(sample, buf) | ||
| 57 | buf.seek(0) | ||
| 58 | self._txn.put(key, buf.read()) | ||
| 59 | |||
| 60 | def __getitem__(self, idx): | ||
| 61 | """get a particular batch of samples""" | ||
| 62 | cursor = self._txn.cursor() | ||
| 63 | first_key = idx * self._batch_size | ||
| 64 | cursor.set_key(idx_to_key(first_key)) | ||
| 65 | input_arrays = [] | ||
| 66 | target_arrays = [] | ||
| 67 | for key, value in cursor.iternext(): | ||
| 68 | if key_to_idx(key) >= (first_key + self._batch_size): | ||
| 69 | break | ||
| 70 | input_csr, target_csr = joblib.load(BytesIO(value)) | ||
| 71 | input_arrays.append(input_csr.toarray()) | ||
| 72 | target_arrays.append(target_csr.toarray().flatten()) | ||
| 73 | return np.array(input_arrays), np.array(target_arrays) | ||
| 74 | |||
| 75 | def __len__(self): | ||
| 76 | """return the number of available batches""" | ||
| 77 | return int(np.ceil(self._counter / self._batch_size)) | ||
| 78 | |||
| 79 | |||
| 80 | View Code Duplication | class NNEnsembleOptimizer(hyperopt.HyperparameterOptimizer): | |
| 81 | """Hyperparameter optimizer for the NN ensemble backend""" | ||
| 82 | |||
| 83 | def _prepare(self, n_jobs=1): | ||
| 84 | sources = dict( | ||
| 85 | annif.util.parse_sources(self._backend.params['sources'])) | ||
| 86 | |||
| 87 | # initialize the source projects before forking, to save memory | ||
| 88 | for project_id in sources.keys(): | ||
| 89 | project = self._backend.project.registry.get_project(project_id) | ||
| 90 | project.initialize(parallel=True) | ||
| 91 | |||
| 92 | psmap = annif.parallel.ProjectSuggestMap( | ||
| 93 | self._backend.project.registry, | ||
| 94 | list(sources.keys()), | ||
| 95 | backend_params=None, | ||
| 96 | limit=None, | ||
| 97 | threshold=0.0) | ||
| 98 | |||
| 99 | jobs, pool_class = annif.parallel.get_pool(n_jobs) | ||
| 100 | |||
| 101 | self._score_vectors = [] | ||
| 102 | self._gold_subjects = [] | ||
| 103 | |||
| 104 | with pool_class(jobs) as pool: | ||
| 105 | for hits, uris, labels in pool.imap_unordered( | ||
| 106 | psmap.suggest, self._corpus.documents): | ||
| 107 | doc_scores = [] | ||
| 108 | for project_id, p_hits in hits.items(): | ||
| 109 | vector = p_hits.as_vector(self._backend.project.subjects) | ||
| 110 | doc_scores.append(np.sqrt(vector) | ||
| 111 | * sources[project_id] | ||
| 112 | * len(sources)) | ||
| 113 | score_vector = np.array(doc_scores, | ||
| 114 | dtype=np.float32).transpose() | ||
| 115 | subjects = annif.corpus.SubjectSet((uris, labels)) | ||
| 116 | self._score_vectors.append(score_vector) | ||
| 117 | self._gold_subjects.append(subjects) | ||
| 118 | |||
| 119 | def _objective(self, trial): | ||
| 120 | sources = annif.util.parse_sources(self._backend.params['sources']) | ||
| 121 |         params = { | ||
| 122 |             'nodes': trial.suggest_int('nodes', 50, 200), | ||
| 123 |             'dropout_rate': trial.suggest_float('dropout_rate', 0.0, 0.5), | ||
| 124 |             'epochs': trial.suggest_int('epochs', 5, 20), | ||
| 125 | 'optimizer': 'adam' | ||
| 126 | } | ||
| 127 | model = self._backend._create_model(sources, params) | ||
| 128 | |||
| 129 | env = self._backend._open_lmdb(True, | ||
| 130 | self._backend.params['lmdb_map_size']) | ||
| 131 | with env.begin(buffers=True) as txn: | ||
| 132 | seq = LMDBSequence(txn, batch_size=32) | ||
| 133 | model.fit(seq, verbose=0, epochs=params['epochs']) | ||
| 134 | |||
| 135 | batch = annif.eval.EvaluationBatch(self._backend.project.subjects) | ||
| 136 | for goldsubj, score_vector in zip(self._gold_subjects, | ||
| 137 | self._score_vectors): | ||
| 138 | |||
| 139 | results = model.predict( | ||
| 140 | np.expand_dims(score_vector, 0)) | ||
| 141 | output = VectorSuggestionResult(results[0]) | ||
| 142 | batch.evaluate(output, goldsubj) | ||
| 143 | eval_results = batch.results(metrics=[self._metric]) | ||
| 144 | return eval_results[self._metric] | ||
| 145 | |||
| 146 | def _postprocess(self, study): | ||
| 147 | bp = study.best_params | ||
| 148 | lines = [ | ||
| 149 |             f"nodes={bp['nodes']}", | ||
| 150 |             f"dropout_rate={bp['dropout_rate']}", | ||
| 151 |             f"epochs={bp['epochs']}" | ||
| 152 | ] | ||
| 153 | return hyperopt.HPRecommendation(lines=lines, score=study.best_value) | ||
| 154 | |||
| 155 | |||
| 156 | class MeanLayer(Layer): | ||
| 157 | """Custom Keras layer that calculates mean values along the 2nd axis.""" | ||
| 158 | def call(self, inputs): | ||
| 159 | return K.mean(inputs, axis=2) | ||
| 160 | |||
| 161 | |||
| 162 | View Code Duplication | class NNEnsembleBackend( | |
| 163 | backend.AnnifLearningBackend, | ||
| 164 | ensemble.BaseEnsembleBackend, | ||
| 165 | hyperopt.AnnifHyperoptBackend): | ||
| 166 | """Neural network ensemble backend that combines results from multiple | ||
| 167 | projects""" | ||
| 168 | |||
| 169 | name = "nn_ensemble" | ||
| 170 | |||
| 171 | MODEL_FILE = "nn-model.h5" | ||
| 172 | LMDB_FILE = 'nn-train.mdb' | ||
| 173 | |||
| 174 |     DEFAULT_PARAMETERS = { | ||
| 175 | 'nodes': 100, | ||
| 176 | 'dropout_rate': 0.2, | ||
| 177 | 'optimizer': 'adam', | ||
| 178 | 'epochs': 10, | ||
| 179 | 'learn-epochs': 1, | ||
| 180 | 'lmdb_map_size': 1024 * 1024 * 1024 | ||
| 181 | } | ||
| 182 | |||
| 183 | # defaults for uninitialized instances | ||
| 184 | _model = None | ||
| 185 | |||
| 186 | def get_hp_optimizer(self, corpus, metric): | ||
| 187 | return NNEnsembleOptimizer(self, corpus, metric) | ||
| 188 | |||
| 189 | def default_params(self): | ||
| 190 | params = backend.AnnifBackend.DEFAULT_PARAMETERS.copy() | ||
| 191 | params.update(self.DEFAULT_PARAMETERS) | ||
| 192 | return params | ||
| 193 | |||
| 194 | def initialize(self, parallel=False): | ||
| 195 | super().initialize(parallel) | ||
| 196 | if self._model is not None: | ||
| 197 | return # already initialized | ||
| 198 | if parallel: | ||
| 199 | # Don't load TF model just before parallel execution, | ||
| 200 | # since it won't work after forking worker processes | ||
| 201 | return | ||
| 202 | model_filename = os.path.join(self.datadir, self.MODEL_FILE) | ||
| 203 | if not os.path.exists(model_filename): | ||
| 204 | raise NotInitializedException( | ||
| 205 |                 'model file {} not found'.format(model_filename), | ||
| 206 | backend_id=self.backend_id) | ||
| 207 |         self.debug('loading Keras model from {}'.format(model_filename)) | ||
| 208 | self._model = load_model(model_filename, | ||
| 209 |                                  custom_objects={'MeanLayer': MeanLayer}) | ||
| 210 | |||
| 211 | def _merge_hits_from_sources(self, hits_from_sources, params): | ||
| 212 | score_vector = np.array([np.sqrt(hits.as_vector(subjects)) | ||
| 213 | * weight * len(hits_from_sources) | ||
| 214 | for hits, weight, subjects | ||
| 215 | in hits_from_sources], | ||
| 216 | dtype=np.float32) | ||
| 217 | results = self._model.predict( | ||
| 218 | np.expand_dims(score_vector.transpose(), 0)) | ||
| 219 | return VectorSuggestionResult(results[0]) | ||
| 220 | |||
| 221 | def _create_model(self, sources, params): | ||
| 222 | inputs = Input(shape=(len(self.project.subjects), len(sources))) | ||
| 223 | |||
| 224 | flat_input = Flatten()(inputs) | ||
| 225 | drop_input = Dropout( | ||
| 226 | rate=float(params['dropout_rate']))(flat_input) | ||
| 227 | hidden = Dense(int(params['nodes']), | ||
| 228 | activation="relu")(drop_input) | ||
| 229 | drop_hidden = Dropout(rate=float(params['dropout_rate']))(hidden) | ||
| 230 | delta = Dense(len(self.project.subjects), | ||
| 231 | kernel_initializer='zeros', | ||
| 232 | bias_initializer='zeros')(drop_hidden) | ||
| 233 | |||
| 234 | mean = MeanLayer()(inputs) | ||
| 235 | |||
| 236 | predictions = Add()([mean, delta]) | ||
| 237 | |||
| 238 | model = Model(inputs=inputs, outputs=predictions) | ||
| 239 | model.compile(optimizer=params['optimizer'], | ||
| 240 | loss='binary_crossentropy', | ||
| 241 | metrics=['top_k_categorical_accuracy']) | ||
| 242 | if 'lr' in params: | ||
| 243 | model.optimizer.learning_rate.assign( | ||
| 244 | float(params['lr'])) | ||
| 245 | |||
| 246 | summary = [] | ||
| 247 | model.summary(print_fn=summary.append) | ||
| 248 |         self.debug("Created model: \n" + "\n".join(summary)) | ||
| 249 | return model | ||
| 250 | |||
| 251 | def _train(self, corpus, params, jobs=0): | ||
| 252 | sources = annif.util.parse_sources(params['sources']) | ||
| 253 |         self.info("creating NN ensemble model") | ||
| 254 | self._model = self._create_model(sources, params) | ||
| 255 | self._fit_model( | ||
| 256 | corpus, | ||
| 257 | epochs=int(params['epochs']), | ||
| 258 | lmdb_map_size=int(params['lmdb_map_size']), | ||
| 259 | n_jobs=jobs) | ||
| 260 | |||
| 261 | def _corpus_to_vectors(self, corpus, seq, n_jobs): | ||
| 262 | # pass corpus through all source projects | ||
| 263 | sources = dict( | ||
| 264 | annif.util.parse_sources(self.params['sources'])) | ||
| 265 | |||
| 266 | # initialize the source projects before forking, to save memory | ||
| 267 | self.info( | ||
| 268 |             f"Initializing source projects: {', '.join(sources.keys())}") | ||
| 269 | for project_id in sources.keys(): | ||
| 270 | project = self.project.registry.get_project(project_id) | ||
| 271 | project.initialize(parallel=True) | ||
| 272 | |||
| 273 | psmap = annif.parallel.ProjectSuggestMap( | ||
| 274 | self.project.registry, | ||
| 275 | list(sources.keys()), | ||
| 276 | backend_params=None, | ||
| 277 | limit=None, | ||
| 278 | threshold=0.0) | ||
| 279 | |||
| 280 | jobs, pool_class = annif.parallel.get_pool(n_jobs) | ||
| 281 | |||
| 282 |         self.info("Processing training documents...") | ||
| 283 | with pool_class(jobs) as pool: | ||
| 284 | for hits, uris, labels in pool.imap_unordered( | ||
| 285 | psmap.suggest, corpus.documents): | ||
| 286 | doc_scores = [] | ||
| 287 | for project_id, p_hits in hits.items(): | ||
| 288 | vector = p_hits.as_vector(self.project.subjects) | ||
| 289 | doc_scores.append(np.sqrt(vector) | ||
| 290 | * sources[project_id] | ||
| 291 | * len(sources)) | ||
| 292 | score_vector = np.array(doc_scores, | ||
| 293 | dtype=np.float32).transpose() | ||
| 294 | subjects = annif.corpus.SubjectSet((uris, labels)) | ||
| 295 | true_vector = subjects.as_vector(self.project.subjects) | ||
| 296 | seq.add_sample(score_vector, true_vector) | ||
| 297 | |||
| 298 | def _open_lmdb(self, cached, lmdb_map_size): | ||
| 299 | lmdb_path = os.path.join(self.datadir, self.LMDB_FILE) | ||
| 300 | if not cached and os.path.exists(lmdb_path): | ||
| 301 | shutil.rmtree(lmdb_path) | ||
| 302 | return lmdb.open(lmdb_path, map_size=lmdb_map_size, writemap=True) | ||
| 303 | |||
| 304 | def _fit_model(self, corpus, epochs, lmdb_map_size, n_jobs=1): | ||
| 305 | env = self._open_lmdb(corpus == 'cached', lmdb_map_size) | ||
| 306 | if corpus != 'cached': | ||
| 307 | if corpus.is_empty(): | ||
| 308 | raise NotSupportedException( | ||
| 309 | 'Cannot train nn_ensemble project with no documents') | ||
| 310 | with env.begin(write=True, buffers=True) as txn: | ||
| 311 | seq = LMDBSequence(txn, batch_size=32) | ||
| 312 | self._corpus_to_vectors(corpus, seq, n_jobs) | ||
| 313 | else: | ||
| 314 |             self.info("Reusing cached training data from previous run.") | ||
| 315 | # fit the model using a read-only view of the LMDB | ||
| 316 |         self.info("Training neural network model...") | ||
| 317 | with env.begin(buffers=True) as txn: | ||
| 318 | seq = LMDBSequence(txn, batch_size=32) | ||
| 319 | self._model.fit(seq, verbose=1, epochs=epochs) | ||
| 320 | |||
| 321 | annif.util.atomic_save( | ||
| 322 | self._model, | ||
| 323 | self.datadir, | ||
| 324 | self.MODEL_FILE) | ||
| 325 | |||
| 326 | def _learn(self, corpus, params): | ||
| 327 | self.initialize() | ||
| 328 | self._fit_model( | ||
| 329 | corpus, | ||
| 330 | int(params['learn-epochs']), | ||
| 331 | int(params['lmdb_map_size'])) | ||
| 332 |