Total Complexity | 43 |
Total Lines | 332 |
Duplicated Lines | 86.14 % |
Changes | 0 |
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like annif.backend.nn_ensemble often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | """Neural network based ensemble backend that combines results from multiple |
||
2 | projects.""" |
||
3 | |||
4 | |||
5 | from io import BytesIO |
||
6 | import shutil |
||
7 | import os.path |
||
8 | import numpy as np |
||
9 | from scipy.sparse import csr_matrix, csc_matrix |
||
10 | import joblib |
||
11 | import lmdb |
||
12 | from tensorflow.keras.layers import Input, Dense, Add, Flatten, Dropout, Layer |
||
13 | from tensorflow.keras.models import Model, load_model |
||
14 | from tensorflow.keras.utils import Sequence |
||
15 | import tensorflow.keras.backend as K |
||
16 | import annif.corpus |
||
17 | import annif.parallel |
||
18 | import annif.util |
||
19 | from annif.exception import NotInitializedException, NotSupportedException |
||
20 | from annif.suggestion import VectorSuggestionResult |
||
21 | from . import backend |
||
22 | from . import ensemble |
||
23 | from . import hyperopt |
||
24 | |||
25 | |||
26 | def idx_to_key(idx): |
||
27 | """convert an integer index to a binary key for use in LMDB""" |
||
28 | return b'%08d' % idx |
||
29 | |||
30 | |||
31 | def key_to_idx(key): |
||
32 | """convert a binary LMDB key to an integer index""" |
||
33 | return int(key) |
||
34 | |||
35 | |||
36 | View Code Duplication | class LMDBSequence(Sequence): |
|
|
|||
37 | """A sequence of samples stored in a LMDB database.""" |
||
38 | |||
39 | def __init__(self, txn, batch_size): |
||
40 | self._txn = txn |
||
41 | cursor = txn.cursor() |
||
42 | if cursor.last(): |
||
43 | # Counter holds the number of samples in the database |
||
44 | self._counter = key_to_idx(cursor.key()) + 1 |
||
45 | else: # empty database |
||
46 | self._counter = 0 |
||
47 | self._batch_size = batch_size |
||
48 | |||
49 | def add_sample(self, inputs, targets): |
||
50 | # use zero-padded 8-digit key |
||
51 | key = idx_to_key(self._counter) |
||
52 | self._counter += 1 |
||
53 | # convert the sample into a sparse matrix and serialize it as bytes |
||
54 | sample = (csc_matrix(inputs), csr_matrix(targets)) |
||
55 | buf = BytesIO() |
||
56 | joblib.dump(sample, buf) |
||
57 | buf.seek(0) |
||
58 | self._txn.put(key, buf.read()) |
||
59 | |||
60 | def __getitem__(self, idx): |
||
61 | """get a particular batch of samples""" |
||
62 | cursor = self._txn.cursor() |
||
63 | first_key = idx * self._batch_size |
||
64 | cursor.set_key(idx_to_key(first_key)) |
||
65 | input_arrays = [] |
||
66 | target_arrays = [] |
||
67 | for key, value in cursor.iternext(): |
||
68 | if key_to_idx(key) >= (first_key + self._batch_size): |
||
69 | break |
||
70 | input_csr, target_csr = joblib.load(BytesIO(value)) |
||
71 | input_arrays.append(input_csr.toarray()) |
||
72 | target_arrays.append(target_csr.toarray().flatten()) |
||
73 | return np.array(input_arrays), np.array(target_arrays) |
||
74 | |||
75 | def __len__(self): |
||
76 | """return the number of available batches""" |
||
77 | return int(np.ceil(self._counter / self._batch_size)) |
||
78 | |||
79 | |||
80 | View Code Duplication | class NNEnsembleOptimizer(hyperopt.HyperparameterOptimizer): |
|
81 | """Hyperparameter optimizer for the NN ensemble backend""" |
||
82 | |||
83 | def _prepare(self, n_jobs=1): |
||
84 | sources = dict( |
||
85 | annif.util.parse_sources(self._backend.params['sources'])) |
||
86 | |||
87 | # initialize the source projects before forking, to save memory |
||
88 | for project_id in sources.keys(): |
||
89 | project = self._backend.project.registry.get_project(project_id) |
||
90 | project.initialize(parallel=True) |
||
91 | |||
92 | psmap = annif.parallel.ProjectSuggestMap( |
||
93 | self._backend.project.registry, |
||
94 | list(sources.keys()), |
||
95 | backend_params=None, |
||
96 | limit=None, |
||
97 | threshold=0.0) |
||
98 | |||
99 | jobs, pool_class = annif.parallel.get_pool(n_jobs) |
||
100 | |||
101 | self._score_vectors = [] |
||
102 | self._gold_subjects = [] |
||
103 | |||
104 | with pool_class(jobs) as pool: |
||
105 | for hits, uris, labels in pool.imap_unordered( |
||
106 | psmap.suggest, self._corpus.documents): |
||
107 | doc_scores = [] |
||
108 | for project_id, p_hits in hits.items(): |
||
109 | vector = p_hits.as_vector(self._backend.project.subjects) |
||
110 | doc_scores.append(np.sqrt(vector) |
||
111 | * sources[project_id] |
||
112 | * len(sources)) |
||
113 | score_vector = np.array(doc_scores, |
||
114 | dtype=np.float32).transpose() |
||
115 | subjects = annif.corpus.SubjectSet((uris, labels)) |
||
116 | self._score_vectors.append(score_vector) |
||
117 | self._gold_subjects.append(subjects) |
||
118 | |||
119 | def _objective(self, trial): |
||
120 | sources = annif.util.parse_sources(self._backend.params['sources']) |
||
121 | params = { |
||
122 | 'nodes': trial.suggest_int('nodes', 50, 200), |
||
123 | 'dropout_rate': trial.suggest_float('dropout_rate', 0.0, 0.5), |
||
124 | 'epochs': trial.suggest_int('epochs', 5, 20), |
||
125 | 'optimizer': 'adam' |
||
126 | } |
||
127 | model = self._backend._create_model(sources, params) |
||
128 | |||
129 | env = self._backend._open_lmdb(True, |
||
130 | self._backend.params['lmdb_map_size']) |
||
131 | with env.begin(buffers=True) as txn: |
||
132 | seq = LMDBSequence(txn, batch_size=32) |
||
133 | model.fit(seq, verbose=0, epochs=params['epochs']) |
||
134 | |||
135 | batch = annif.eval.EvaluationBatch(self._backend.project.subjects) |
||
136 | for goldsubj, score_vector in zip(self._gold_subjects, |
||
137 | self._score_vectors): |
||
138 | |||
139 | results = model.predict( |
||
140 | np.expand_dims(score_vector, 0)) |
||
141 | output = VectorSuggestionResult(results[0]) |
||
142 | batch.evaluate(output, goldsubj) |
||
143 | eval_results = batch.results(metrics=[self._metric]) |
||
144 | return eval_results[self._metric] |
||
145 | |||
146 | def _postprocess(self, study): |
||
147 | bp = study.best_params |
||
148 | lines = [ |
||
149 | f"nodes={bp['nodes']}", |
||
150 | f"dropout_rate={bp['dropout_rate']}", |
||
151 | f"epochs={bp['epochs']}" |
||
152 | ] |
||
153 | return hyperopt.HPRecommendation(lines=lines, score=study.best_value) |
||
154 | |||
155 | |||
156 | class MeanLayer(Layer): |
||
157 | """Custom Keras layer that calculates mean values along the 2nd axis.""" |
||
158 | def call(self, inputs): |
||
159 | return K.mean(inputs, axis=2) |
||
160 | |||
161 | |||
162 | View Code Duplication | class NNEnsembleBackend( |
|
163 | backend.AnnifLearningBackend, |
||
164 | ensemble.BaseEnsembleBackend, |
||
165 | hyperopt.AnnifHyperoptBackend): |
||
166 | """Neural network ensemble backend that combines results from multiple |
||
167 | projects""" |
||
168 | |||
169 | name = "nn_ensemble" |
||
170 | |||
171 | MODEL_FILE = "nn-model.h5" |
||
172 | LMDB_FILE = 'nn-train.mdb' |
||
173 | |||
174 | DEFAULT_PARAMETERS = { |
||
175 | 'nodes': 100, |
||
176 | 'dropout_rate': 0.2, |
||
177 | 'optimizer': 'adam', |
||
178 | 'epochs': 10, |
||
179 | 'learn-epochs': 1, |
||
180 | 'lmdb_map_size': 1024 * 1024 * 1024 |
||
181 | } |
||
182 | |||
183 | # defaults for uninitialized instances |
||
184 | _model = None |
||
185 | |||
186 | def get_hp_optimizer(self, corpus, metric): |
||
187 | return NNEnsembleOptimizer(self, corpus, metric) |
||
188 | |||
189 | def default_params(self): |
||
190 | params = backend.AnnifBackend.DEFAULT_PARAMETERS.copy() |
||
191 | params.update(self.DEFAULT_PARAMETERS) |
||
192 | return params |
||
193 | |||
194 | def initialize(self, parallel=False): |
||
195 | super().initialize(parallel) |
||
196 | if self._model is not None: |
||
197 | return # already initialized |
||
198 | if parallel: |
||
199 | # Don't load TF model just before parallel execution, |
||
200 | # since it won't work after forking worker processes |
||
201 | return |
||
202 | model_filename = os.path.join(self.datadir, self.MODEL_FILE) |
||
203 | if not os.path.exists(model_filename): |
||
204 | raise NotInitializedException( |
||
205 | 'model file {} not found'.format(model_filename), |
||
206 | backend_id=self.backend_id) |
||
207 | self.debug('loading Keras model from {}'.format(model_filename)) |
||
208 | self._model = load_model(model_filename, |
||
209 | custom_objects={'MeanLayer': MeanLayer}) |
||
210 | |||
211 | def _merge_hits_from_sources(self, hits_from_sources, params): |
||
212 | score_vector = np.array([np.sqrt(hits.as_vector(subjects)) |
||
213 | * weight * len(hits_from_sources) |
||
214 | for hits, weight, subjects |
||
215 | in hits_from_sources], |
||
216 | dtype=np.float32) |
||
217 | results = self._model.predict( |
||
218 | np.expand_dims(score_vector.transpose(), 0)) |
||
219 | return VectorSuggestionResult(results[0]) |
||
220 | |||
221 | def _create_model(self, sources, params): |
||
222 | inputs = Input(shape=(len(self.project.subjects), len(sources))) |
||
223 | |||
224 | flat_input = Flatten()(inputs) |
||
225 | drop_input = Dropout( |
||
226 | rate=float(params['dropout_rate']))(flat_input) |
||
227 | hidden = Dense(int(params['nodes']), |
||
228 | activation="relu")(drop_input) |
||
229 | drop_hidden = Dropout(rate=float(params['dropout_rate']))(hidden) |
||
230 | delta = Dense(len(self.project.subjects), |
||
231 | kernel_initializer='zeros', |
||
232 | bias_initializer='zeros')(drop_hidden) |
||
233 | |||
234 | mean = MeanLayer()(inputs) |
||
235 | |||
236 | predictions = Add()([mean, delta]) |
||
237 | |||
238 | model = Model(inputs=inputs, outputs=predictions) |
||
239 | model.compile(optimizer=params['optimizer'], |
||
240 | loss='binary_crossentropy', |
||
241 | metrics=['top_k_categorical_accuracy']) |
||
242 | if 'lr' in params: |
||
243 | model.optimizer.learning_rate.assign( |
||
244 | float(params['lr'])) |
||
245 | |||
246 | summary = [] |
||
247 | model.summary(print_fn=summary.append) |
||
248 | self.debug("Created model: \n" + "\n".join(summary)) |
||
249 | return model |
||
250 | |||
251 | def _train(self, corpus, params, jobs=0): |
||
252 | sources = annif.util.parse_sources(params['sources']) |
||
253 | self.info("creating NN ensemble model") |
||
254 | self._model = self._create_model(sources, params) |
||
255 | self._fit_model( |
||
256 | corpus, |
||
257 | epochs=int(params['epochs']), |
||
258 | lmdb_map_size=int(params['lmdb_map_size']), |
||
259 | n_jobs=jobs) |
||
260 | |||
261 | def _corpus_to_vectors(self, corpus, seq, n_jobs): |
||
262 | # pass corpus through all source projects |
||
263 | sources = dict( |
||
264 | annif.util.parse_sources(self.params['sources'])) |
||
265 | |||
266 | # initialize the source projects before forking, to save memory |
||
267 | self.info( |
||
268 | f"Initializing source projects: {', '.join(sources.keys())}") |
||
269 | for project_id in sources.keys(): |
||
270 | project = self.project.registry.get_project(project_id) |
||
271 | project.initialize(parallel=True) |
||
272 | |||
273 | psmap = annif.parallel.ProjectSuggestMap( |
||
274 | self.project.registry, |
||
275 | list(sources.keys()), |
||
276 | backend_params=None, |
||
277 | limit=None, |
||
278 | threshold=0.0) |
||
279 | |||
280 | jobs, pool_class = annif.parallel.get_pool(n_jobs) |
||
281 | |||
282 | self.info("Processing training documents...") |
||
283 | with pool_class(jobs) as pool: |
||
284 | for hits, uris, labels in pool.imap_unordered( |
||
285 | psmap.suggest, corpus.documents): |
||
286 | doc_scores = [] |
||
287 | for project_id, p_hits in hits.items(): |
||
288 | vector = p_hits.as_vector(self.project.subjects) |
||
289 | doc_scores.append(np.sqrt(vector) |
||
290 | * sources[project_id] |
||
291 | * len(sources)) |
||
292 | score_vector = np.array(doc_scores, |
||
293 | dtype=np.float32).transpose() |
||
294 | subjects = annif.corpus.SubjectSet((uris, labels)) |
||
295 | true_vector = subjects.as_vector(self.project.subjects) |
||
296 | seq.add_sample(score_vector, true_vector) |
||
297 | |||
298 | def _open_lmdb(self, cached, lmdb_map_size): |
||
299 | lmdb_path = os.path.join(self.datadir, self.LMDB_FILE) |
||
300 | if not cached and os.path.exists(lmdb_path): |
||
301 | shutil.rmtree(lmdb_path) |
||
302 | return lmdb.open(lmdb_path, map_size=lmdb_map_size, writemap=True) |
||
303 | |||
304 | def _fit_model(self, corpus, epochs, lmdb_map_size, n_jobs=1): |
||
305 | env = self._open_lmdb(corpus == 'cached', lmdb_map_size) |
||
306 | if corpus != 'cached': |
||
307 | if corpus.is_empty(): |
||
308 | raise NotSupportedException( |
||
309 | 'Cannot train nn_ensemble project with no documents') |
||
310 | with env.begin(write=True, buffers=True) as txn: |
||
311 | seq = LMDBSequence(txn, batch_size=32) |
||
312 | self._corpus_to_vectors(corpus, seq, n_jobs) |
||
313 | else: |
||
314 | self.info("Reusing cached training data from previous run.") |
||
315 | # fit the model using a read-only view of the LMDB |
||
316 | self.info("Training neural network model...") |
||
317 | with env.begin(buffers=True) as txn: |
||
318 | seq = LMDBSequence(txn, batch_size=32) |
||
319 | self._model.fit(seq, verbose=1, epochs=epochs) |
||
320 | |||
321 | annif.util.atomic_save( |
||
322 | self._model, |
||
323 | self.datadir, |
||
324 | self.MODEL_FILE) |
||
325 | |||
326 | def _learn(self, corpus, params): |
||
327 | self.initialize() |
||
328 | self._fit_model( |
||
329 | corpus, |
||
330 | int(params['learn-epochs']), |
||
331 | int(params['lmdb_map_size'])) |
||
332 |