| Total Complexity | 62 |
| Total Lines | 271 |
| Duplicated Lines | 93.36 % |
| Changes | 0 | ||
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like annif.corpus.subject often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
| 1 | """Classes for supporting subject corpora expressed as directories or files""" |
||
| 2 | |||
| 3 | import csv |
||
| 4 | import numpy as np |
||
| 5 | import annif.util |
||
| 6 | import os.path |
||
| 7 | from annif import logger |
||
| 8 | from .types import Subject, SubjectCorpus |
||
| 9 | from .skos import serialize_subjects_to_skos |
||
| 10 | |||
| 11 | |||
| 12 | View Code Duplication | class SubjectFileTSV(SubjectCorpus): |
|
|
|
|||
| 13 | """A monolingual subject vocabulary stored in a TSV file.""" |
||
| 14 | |||
| 15 | def __init__(self, path, language): |
||
| 16 | """initialize the SubjectFileTSV given a path to a TSV file and the |
||
| 17 | language of the vocabulary""" |
||
| 18 | |||
| 19 | self.path = path |
||
| 20 | self.language = language |
||
| 21 | |||
| 22 | def _parse_line(self, line): |
||
| 23 | vals = line.strip().split('\t', 2) |
||
| 24 | clean_uri = annif.util.cleanup_uri(vals[0]) |
||
| 25 | label = vals[1] if len(vals) >= 2 else None |
||
| 26 | labels = {self.language: label} if label else None |
||
| 27 | notation = vals[2] if len(vals) >= 3 else None |
||
| 28 | yield Subject(uri=clean_uri, |
||
| 29 | labels=labels, |
||
| 30 | notation=notation) |
||
| 31 | |||
| 32 | @property |
||
| 33 | def languages(self): |
||
| 34 | return [self.language] |
||
| 35 | |||
| 36 | @property |
||
| 37 | def subjects(self): |
||
| 38 | with open(self.path, encoding='utf-8-sig') as subjfile: |
||
| 39 | for line in subjfile: |
||
| 40 | yield from self._parse_line(line) |
||
| 41 | |||
| 42 | def save_skos(self, path): |
||
| 43 | """Save the contents of the subject vocabulary into a SKOS/Turtle |
||
| 44 | file with the given path name.""" |
||
| 45 | serialize_subjects_to_skos(self.subjects, path) |
||
| 46 | |||
| 47 | |||
| 48 | View Code Duplication | class SubjectFileCSV(SubjectCorpus): |
|
| 49 | """A multilingual subject vocabulary stored in a CSV file.""" |
||
| 50 | |||
| 51 | def __init__(self, path): |
||
| 52 | """initialize the SubjectFileCSV given a path to a CSV file""" |
||
| 53 | self.path = path |
||
| 54 | |||
| 55 | def _parse_row(self, row): |
||
| 56 | labels = { |
||
| 57 | fname.replace('label_', ''): value or None |
||
| 58 | for fname, value in row.items() |
||
| 59 | if fname.startswith('label_') |
||
| 60 | } |
||
| 61 | |||
| 62 | # if there are no labels in any language, set labels to None |
||
| 63 | # indicating a deprecated subject |
||
| 64 | if set(labels.values()) == {None}: |
||
| 65 | labels = None |
||
| 66 | |||
| 67 | yield Subject(uri=annif.util.cleanup_uri(row['uri']), |
||
| 68 | labels=labels, |
||
| 69 | notation=row.get('notation', None) or None) |
||
| 70 | |||
| 71 | @property |
||
| 72 | def languages(self): |
||
| 73 | # infer the supported languages from the CSV column names |
||
| 74 | with open(self.path, encoding='utf-8-sig') as csvfile: |
||
| 75 | reader = csv.reader(csvfile) |
||
| 76 | fieldnames = next(reader, None) |
||
| 77 | |||
| 78 | return [fname.replace('label_', '') |
||
| 79 | for fname in fieldnames |
||
| 80 | if fname.startswith('label_')] |
||
| 81 | |||
| 82 | @property |
||
| 83 | def subjects(self): |
||
| 84 | with open(self.path, encoding='utf-8-sig') as csvfile: |
||
| 85 | reader = csv.DictReader(csvfile) |
||
| 86 | for row in reader: |
||
| 87 | yield from self._parse_row(row) |
||
| 88 | |||
| 89 | def save_skos(self, path): |
||
| 90 | """Save the contents of the subject vocabulary into a SKOS/Turtle |
||
| 91 | file with the given path name.""" |
||
| 92 | serialize_subjects_to_skos(self.subjects, path) |
||
| 93 | |||
| 94 | @staticmethod |
||
| 95 | def is_csv_file(path): |
||
| 96 | """return True if the path looks like a CSV file""" |
||
| 97 | |||
| 98 | return os.path.splitext(path)[1].lower() == '.csv' |
||
| 99 | |||
| 100 | |||
| 101 | View Code Duplication | class SubjectIndex: |
|
| 102 | """An index that remembers the associations between integers subject IDs |
||
| 103 | and their URIs and labels.""" |
||
| 104 | |||
| 105 | def __init__(self): |
||
| 106 | self._subjects = [] |
||
| 107 | self._uri_idx = {} |
||
| 108 | self._label_idx = {} |
||
| 109 | self._languages = None |
||
| 110 | |||
| 111 | def load_subjects(self, corpus): |
||
| 112 | """Initialize the subject index from a subject corpus""" |
||
| 113 | |||
| 114 | self._languages = corpus.languages |
||
| 115 | for subject in corpus.subjects: |
||
| 116 | self.append(subject) |
||
| 117 | |||
| 118 | def __len__(self): |
||
| 119 | return len(self._subjects) |
||
| 120 | |||
| 121 | @property |
||
| 122 | def languages(self): |
||
| 123 | return self._languages |
||
| 124 | |||
| 125 | def __getitem__(self, subject_id): |
||
| 126 | return self._subjects[subject_id] |
||
| 127 | |||
| 128 | def append(self, subject): |
||
| 129 | if self._languages is None and subject.labels is not None: |
||
| 130 | self._languages = list(subject.labels.keys()) |
||
| 131 | |||
| 132 | subject_id = len(self._subjects) |
||
| 133 | self._uri_idx[subject.uri] = subject_id |
||
| 134 | if subject.labels: |
||
| 135 | for lang, label in subject.labels.items(): |
||
| 136 | self._label_idx[(label, lang)] = subject_id |
||
| 137 | self._subjects.append(subject) |
||
| 138 | |||
| 139 | def contains_uri(self, uri): |
||
| 140 | return uri in self._uri_idx |
||
| 141 | |||
| 142 | def by_uri(self, uri, warnings=True): |
||
| 143 | """return the subject ID of a subject by its URI, or None if not found. |
||
| 144 | If warnings=True, log a warning message if the URI cannot be found.""" |
||
| 145 | try: |
||
| 146 | return self._uri_idx[uri] |
||
| 147 | except KeyError: |
||
| 148 | if warnings: |
||
| 149 | logger.warning('Unknown subject URI <%s>', uri) |
||
| 150 | return None |
||
| 151 | |||
| 152 | def by_label(self, label, language): |
||
| 153 | """return the subject ID of a subject by its label in a given |
||
| 154 | language""" |
||
| 155 | try: |
||
| 156 | return self._label_idx[(label, language)] |
||
| 157 | except KeyError: |
||
| 158 | logger.warning('Unknown subject label "%s"@%s', label, language) |
||
| 159 | return None |
||
| 160 | |||
| 161 | def deprecated_ids(self): |
||
| 162 | """return indices of deprecated subjects""" |
||
| 163 | |||
| 164 | return [subject_id for subject_id, subject in enumerate(self._subjects) |
||
| 165 | if subject.labels is None] |
||
| 166 | |||
| 167 | @property |
||
| 168 | def active(self): |
||
| 169 | """return a list of (subject_id, subject) tuples of all subjects that |
||
| 170 | are not deprecated""" |
||
| 171 | |||
| 172 | return [(subj_id, subject) |
||
| 173 | for subj_id, subject |
||
| 174 | in enumerate(self._subjects) |
||
| 175 | if subject.labels is not None] |
||
| 176 | |||
| 177 | def save(self, path): |
||
| 178 | """Save this subject index into a file with the given path name.""" |
||
| 179 | |||
| 180 | fieldnames = ['uri', 'notation'] + \ |
||
| 181 | [f'label_{lang}' for lang in self._languages] |
||
| 182 | |||
| 183 | with open(path, 'w', encoding='utf-8', newline='') as csvfile: |
||
| 184 | writer = csv.DictWriter(csvfile, fieldnames=fieldnames) |
||
| 185 | writer.writeheader() |
||
| 186 | for subject in self: |
||
| 187 | row = {'uri': subject.uri, |
||
| 188 | 'notation': subject.notation or ''} |
||
| 189 | if subject.labels: |
||
| 190 | for lang, label in subject.labels.items(): |
||
| 191 | row[f'label_{lang}'] = label |
||
| 192 | writer.writerow(row) |
||
| 193 | |||
| 194 | @classmethod |
||
| 195 | def load(cls, path): |
||
| 196 | """Load a subject index from a CSV file and return it.""" |
||
| 197 | |||
| 198 | corpus = SubjectFileCSV(path) |
||
| 199 | subject_index = cls() |
||
| 200 | subject_index.load_subjects(corpus) |
||
| 201 | return subject_index |
||
| 202 | |||
| 203 | |||
| 204 | View Code Duplication | class SubjectSet: |
|
| 205 | """Represents a set of subjects for a document.""" |
||
| 206 | |||
| 207 | def __init__(self, subject_ids=None): |
||
| 208 | """Create a SubjectSet and optionally initialize it from an iterable |
||
| 209 | of subject IDs""" |
||
| 210 | |||
| 211 | if subject_ids: |
||
| 212 | # use set comprehension to eliminate possible duplicates |
||
| 213 | self._subject_ids = list({subject_id |
||
| 214 | for subject_id in subject_ids |
||
| 215 | if subject_id is not None}) |
||
| 216 | else: |
||
| 217 | self._subject_ids = [] |
||
| 218 | |||
| 219 | def __len__(self): |
||
| 220 | return len(self._subject_ids) |
||
| 221 | |||
| 222 | def __getitem__(self, idx): |
||
| 223 | return self._subject_ids[idx] |
||
| 224 | |||
| 225 | def __bool__(self): |
||
| 226 | return bool(self._subject_ids) |
||
| 227 | |||
| 228 | def __eq__(self, other): |
||
| 229 | if isinstance(other, SubjectSet): |
||
| 230 | return self._subject_ids == other._subject_ids |
||
| 231 | |||
| 232 | return False |
||
| 233 | |||
| 234 | @classmethod |
||
| 235 | def from_string(cls, subj_data, subject_index, language): |
||
| 236 | subject_ids = set() |
||
| 237 | for line in subj_data.splitlines(): |
||
| 238 | uri, label = cls._parse_line(line) |
||
| 239 | if uri is not None: |
||
| 240 | subject_ids.add(subject_index.by_uri(uri)) |
||
| 241 | else: |
||
| 242 | subject_ids.add(subject_index.by_label(label, language)) |
||
| 243 | return cls(subject_ids) |
||
| 244 | |||
| 245 | @staticmethod |
||
| 246 | def _parse_line(line): |
||
| 247 | uri = label = None |
||
| 248 | vals = line.split("\t") |
||
| 249 | for val in vals: |
||
| 250 | val = val.strip() |
||
| 251 | if val == '': |
||
| 252 | continue |
||
| 253 | if val.startswith('<') and val.endswith('>'): # URI |
||
| 254 | uri = val[1:-1] |
||
| 255 | continue |
||
| 256 | label = val |
||
| 257 | break |
||
| 258 | return uri, label |
||
| 259 | |||
| 260 | def as_vector(self, size=None, destination=None): |
||
| 261 | """Return the hits as a one-dimensional NumPy array in sklearn |
||
| 262 | multilabel indicator format. Use destination array if given (not |
||
| 263 | None), otherwise create and return a new one of the given size.""" |
||
| 264 | |||
| 265 | if destination is None: |
||
| 266 | destination = np.zeros(size, dtype=bool) |
||
| 267 | |||
| 268 | destination[list(self._subject_ids)] = True |
||
| 269 | |||
| 270 | return destination |
||
| 271 |