| Total Complexity | 62 | 
| Total Lines | 271 | 
| Duplicated Lines | 93.36 % | 
| Changes | 0 | ||
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like annif.corpus.subject often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
| 1 | """Classes for supporting subject corpora expressed as directories or files"""  | 
            ||
| 2 | |||
| 3 | import csv  | 
            ||
| 4 | import numpy as np  | 
            ||
| 5 | import annif.util  | 
            ||
| 6 | import os.path  | 
            ||
| 7 | from annif import logger  | 
            ||
| 8 | from .types import Subject, SubjectCorpus  | 
            ||
| 9 | from .skos import serialize_subjects_to_skos  | 
            ||
| 10 | |||
| 11 | |||
| 12 | View Code Duplication | class SubjectFileTSV(SubjectCorpus):  | 
            |
| 
                                                                                                    
                        
                         | 
                |||
| 13 | """A monolingual subject vocabulary stored in a TSV file."""  | 
            ||
| 14 | |||
| 15 | def __init__(self, path, language):  | 
            ||
| 16 | """initialize the SubjectFileTSV given a path to a TSV file and the  | 
            ||
| 17 | language of the vocabulary"""  | 
            ||
| 18 | |||
| 19 | self.path = path  | 
            ||
| 20 | self.language = language  | 
            ||
| 21 | |||
| 22 | def _parse_line(self, line):  | 
            ||
| 23 |         vals = line.strip().split('\t', 2) | 
            ||
| 24 | clean_uri = annif.util.cleanup_uri(vals[0])  | 
            ||
| 25 | label = vals[1] if len(vals) >= 2 else None  | 
            ||
| 26 |         labels = {self.language: label} if label else None | 
            ||
| 27 | notation = vals[2] if len(vals) >= 3 else None  | 
            ||
| 28 | yield Subject(uri=clean_uri,  | 
            ||
| 29 | labels=labels,  | 
            ||
| 30 | notation=notation)  | 
            ||
| 31 | |||
| 32 | @property  | 
            ||
| 33 | def languages(self):  | 
            ||
| 34 | return [self.language]  | 
            ||
| 35 | |||
| 36 | @property  | 
            ||
| 37 | def subjects(self):  | 
            ||
| 38 | with open(self.path, encoding='utf-8-sig') as subjfile:  | 
            ||
| 39 | for line in subjfile:  | 
            ||
| 40 | yield from self._parse_line(line)  | 
            ||
| 41 | |||
| 42 | def save_skos(self, path):  | 
            ||
| 43 | """Save the contents of the subject vocabulary into a SKOS/Turtle  | 
            ||
| 44 | file with the given path name."""  | 
            ||
| 45 | serialize_subjects_to_skos(self.subjects, path)  | 
            ||
| 46 | |||
| 47 | |||
| 48 | View Code Duplication | class SubjectFileCSV(SubjectCorpus):  | 
            |
| 49 | """A multilingual subject vocabulary stored in a CSV file."""  | 
            ||
| 50 | |||
| 51 | def __init__(self, path):  | 
            ||
| 52 | """initialize the SubjectFileCSV given a path to a CSV file"""  | 
            ||
| 53 | self.path = path  | 
            ||
| 54 | |||
| 55 | def _parse_row(self, row):  | 
            ||
| 56 |         labels = { | 
            ||
| 57 |             fname.replace('label_', ''): value or None | 
            ||
| 58 | for fname, value in row.items()  | 
            ||
| 59 |             if fname.startswith('label_') | 
            ||
| 60 | }  | 
            ||
| 61 | |||
| 62 | # if there are no labels in any language, set labels to None  | 
            ||
| 63 | # indicating a deprecated subject  | 
            ||
| 64 |         if set(labels.values()) == {None}: | 
            ||
| 65 | labels = None  | 
            ||
| 66 | |||
| 67 | yield Subject(uri=annif.util.cleanup_uri(row['uri']),  | 
            ||
| 68 | labels=labels,  | 
            ||
| 69 |                       notation=row.get('notation', None) or None) | 
            ||
| 70 | |||
| 71 | @property  | 
            ||
| 72 | def languages(self):  | 
            ||
| 73 | # infer the supported languages from the CSV column names  | 
            ||
| 74 | with open(self.path, encoding='utf-8-sig') as csvfile:  | 
            ||
| 75 | reader = csv.reader(csvfile)  | 
            ||
| 76 | fieldnames = next(reader, None)  | 
            ||
| 77 | |||
| 78 |         return [fname.replace('label_', '') | 
            ||
| 79 | for fname in fieldnames  | 
            ||
| 80 |                 if fname.startswith('label_')] | 
            ||
| 81 | |||
| 82 | @property  | 
            ||
| 83 | def subjects(self):  | 
            ||
| 84 | with open(self.path, encoding='utf-8-sig') as csvfile:  | 
            ||
| 85 | reader = csv.DictReader(csvfile)  | 
            ||
| 86 | for row in reader:  | 
            ||
| 87 | yield from self._parse_row(row)  | 
            ||
| 88 | |||
| 89 | def save_skos(self, path):  | 
            ||
| 90 | """Save the contents of the subject vocabulary into a SKOS/Turtle  | 
            ||
| 91 | file with the given path name."""  | 
            ||
| 92 | serialize_subjects_to_skos(self.subjects, path)  | 
            ||
| 93 | |||
| 94 | @staticmethod  | 
            ||
| 95 | def is_csv_file(path):  | 
            ||
| 96 | """return True if the path looks like a CSV file"""  | 
            ||
| 97 | |||
| 98 | return os.path.splitext(path)[1].lower() == '.csv'  | 
            ||
| 99 | |||
| 100 | |||
| 101 | View Code Duplication | class SubjectIndex:  | 
            |
| 102 | """An index that remembers the associations between integers subject IDs  | 
            ||
| 103 | and their URIs and labels."""  | 
            ||
| 104 | |||
| 105 | def __init__(self):  | 
            ||
| 106 | self._subjects = []  | 
            ||
| 107 |         self._uri_idx = {} | 
            ||
| 108 |         self._label_idx = {} | 
            ||
| 109 | self._languages = None  | 
            ||
| 110 | |||
| 111 | def load_subjects(self, corpus):  | 
            ||
| 112 | """Initialize the subject index from a subject corpus"""  | 
            ||
| 113 | |||
| 114 | self._languages = corpus.languages  | 
            ||
| 115 | for subject in corpus.subjects:  | 
            ||
| 116 | self.append(subject)  | 
            ||
| 117 | |||
| 118 | def __len__(self):  | 
            ||
| 119 | return len(self._subjects)  | 
            ||
| 120 | |||
| 121 | @property  | 
            ||
| 122 | def languages(self):  | 
            ||
| 123 | return self._languages  | 
            ||
| 124 | |||
| 125 | def __getitem__(self, subject_id):  | 
            ||
| 126 | return self._subjects[subject_id]  | 
            ||
| 127 | |||
| 128 | def append(self, subject):  | 
            ||
| 129 | if self._languages is None and subject.labels is not None:  | 
            ||
| 130 | self._languages = list(subject.labels.keys())  | 
            ||
| 131 | |||
| 132 | subject_id = len(self._subjects)  | 
            ||
| 133 | self._uri_idx[subject.uri] = subject_id  | 
            ||
| 134 | if subject.labels:  | 
            ||
| 135 | for lang, label in subject.labels.items():  | 
            ||
| 136 | self._label_idx[(label, lang)] = subject_id  | 
            ||
| 137 | self._subjects.append(subject)  | 
            ||
| 138 | |||
| 139 | def contains_uri(self, uri):  | 
            ||
| 140 | return uri in self._uri_idx  | 
            ||
| 141 | |||
| 142 | def by_uri(self, uri, warnings=True):  | 
            ||
| 143 | """return the subject ID of a subject by its URI, or None if not found.  | 
            ||
| 144 | If warnings=True, log a warning message if the URI cannot be found."""  | 
            ||
| 145 | try:  | 
            ||
| 146 | return self._uri_idx[uri]  | 
            ||
| 147 | except KeyError:  | 
            ||
| 148 | if warnings:  | 
            ||
| 149 |                 logger.warning('Unknown subject URI <%s>', uri) | 
            ||
| 150 | return None  | 
            ||
| 151 | |||
| 152 | def by_label(self, label, language):  | 
            ||
| 153 | """return the subject ID of a subject by its label in a given  | 
            ||
| 154 | language"""  | 
            ||
| 155 | try:  | 
            ||
| 156 | return self._label_idx[(label, language)]  | 
            ||
| 157 | except KeyError:  | 
            ||
| 158 |             logger.warning('Unknown subject label "%s"@%s', label, language) | 
            ||
| 159 | return None  | 
            ||
| 160 | |||
| 161 | def deprecated_ids(self):  | 
            ||
| 162 | """return indices of deprecated subjects"""  | 
            ||
| 163 | |||
| 164 | return [subject_id for subject_id, subject in enumerate(self._subjects)  | 
            ||
| 165 | if subject.labels is None]  | 
            ||
| 166 | |||
| 167 | @property  | 
            ||
| 168 | def active(self):  | 
            ||
| 169 | """return a list of (subject_id, subject) tuples of all subjects that  | 
            ||
| 170 | are not deprecated"""  | 
            ||
| 171 | |||
| 172 | return [(subj_id, subject)  | 
            ||
| 173 | for subj_id, subject  | 
            ||
| 174 | in enumerate(self._subjects)  | 
            ||
| 175 | if subject.labels is not None]  | 
            ||
| 176 | |||
| 177 | def save(self, path):  | 
            ||
| 178 | """Save this subject index into a file with the given path name."""  | 
            ||
| 179 | |||
| 180 | fieldnames = ['uri', 'notation'] + \  | 
            ||
| 181 |             [f'label_{lang}' for lang in self._languages] | 
            ||
| 182 | |||
| 183 | with open(path, 'w', encoding='utf-8', newline='') as csvfile:  | 
            ||
| 184 | writer = csv.DictWriter(csvfile, fieldnames=fieldnames)  | 
            ||
| 185 | writer.writeheader()  | 
            ||
| 186 | for subject in self:  | 
            ||
| 187 |                 row = {'uri': subject.uri, | 
            ||
| 188 | 'notation': subject.notation or ''}  | 
            ||
| 189 | if subject.labels:  | 
            ||
| 190 | for lang, label in subject.labels.items():  | 
            ||
| 191 |                         row[f'label_{lang}'] = label | 
            ||
| 192 | writer.writerow(row)  | 
            ||
| 193 | |||
| 194 | @classmethod  | 
            ||
| 195 | def load(cls, path):  | 
            ||
| 196 | """Load a subject index from a CSV file and return it."""  | 
            ||
| 197 | |||
| 198 | corpus = SubjectFileCSV(path)  | 
            ||
| 199 | subject_index = cls()  | 
            ||
| 200 | subject_index.load_subjects(corpus)  | 
            ||
| 201 | return subject_index  | 
            ||
| 202 | |||
| 203 | |||
| 204 | View Code Duplication | class SubjectSet:  | 
            |
| 205 | """Represents a set of subjects for a document."""  | 
            ||
| 206 | |||
| 207 | def __init__(self, subject_ids=None):  | 
            ||
| 208 | """Create a SubjectSet and optionally initialize it from an iterable  | 
            ||
| 209 | of subject IDs"""  | 
            ||
| 210 | |||
| 211 | if subject_ids:  | 
            ||
| 212 | # use set comprehension to eliminate possible duplicates  | 
            ||
| 213 |             self._subject_ids = list({subject_id | 
            ||
| 214 | for subject_id in subject_ids  | 
            ||
| 215 | if subject_id is not None})  | 
            ||
| 216 | else:  | 
            ||
| 217 | self._subject_ids = []  | 
            ||
| 218 | |||
| 219 | def __len__(self):  | 
            ||
| 220 | return len(self._subject_ids)  | 
            ||
| 221 | |||
| 222 | def __getitem__(self, idx):  | 
            ||
| 223 | return self._subject_ids[idx]  | 
            ||
| 224 | |||
| 225 | def __bool__(self):  | 
            ||
| 226 | return bool(self._subject_ids)  | 
            ||
| 227 | |||
| 228 | def __eq__(self, other):  | 
            ||
| 229 | if isinstance(other, SubjectSet):  | 
            ||
| 230 | return self._subject_ids == other._subject_ids  | 
            ||
| 231 | |||
| 232 | return False  | 
            ||
| 233 | |||
| 234 | @classmethod  | 
            ||
| 235 | def from_string(cls, subj_data, subject_index, language):  | 
            ||
| 236 | subject_ids = set()  | 
            ||
| 237 | for line in subj_data.splitlines():  | 
            ||
| 238 | uri, label = cls._parse_line(line)  | 
            ||
| 239 | if uri is not None:  | 
            ||
| 240 | subject_ids.add(subject_index.by_uri(uri))  | 
            ||
| 241 | else:  | 
            ||
| 242 | subject_ids.add(subject_index.by_label(label, language))  | 
            ||
| 243 | return cls(subject_ids)  | 
            ||
| 244 | |||
| 245 | @staticmethod  | 
            ||
| 246 | def _parse_line(line):  | 
            ||
| 247 | uri = label = None  | 
            ||
| 248 |         vals = line.split("\t") | 
            ||
| 249 | for val in vals:  | 
            ||
| 250 | val = val.strip()  | 
            ||
| 251 | if val == '':  | 
            ||
| 252 | continue  | 
            ||
| 253 |             if val.startswith('<') and val.endswith('>'):  # URI | 
            ||
| 254 | uri = val[1:-1]  | 
            ||
| 255 | continue  | 
            ||
| 256 | label = val  | 
            ||
| 257 | break  | 
            ||
| 258 | return uri, label  | 
            ||
| 259 | |||
| 260 | def as_vector(self, size=None, destination=None):  | 
            ||
| 261 | """Return the hits as a one-dimensional NumPy array in sklearn  | 
            ||
| 262 | multilabel indicator format. Use destination array if given (not  | 
            ||
| 263 | None), otherwise create and return a new one of the given size."""  | 
            ||
| 264 | |||
| 265 | if destination is None:  | 
            ||
| 266 | destination = np.zeros(size, dtype=bool)  | 
            ||
| 267 | |||
| 268 | destination[list(self._subject_ids)] = True  | 
            ||
| 269 | |||
| 270 | return destination  | 
            ||
| 271 |