| Total Complexity | 48 |
| Total Lines | 207 |
| Duplicated Lines | 93.24 % |
| Changes | 0 | ||
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like annif.corpus.subject often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
| 1 | """Classes for supporting subject corpora expressed as directories or files""" |
||
| 2 | |||
| 3 | import annif.util |
||
| 4 | import numpy as np |
||
| 5 | from annif import logger |
||
| 6 | from .types import Subject |
||
| 7 | from .skos import serialize_subjects_to_skos |
||
| 8 | |||
| 9 | |||
| 10 | View Code Duplication | class SubjectFileTSV: |
|
|
|
|||
| 11 | """A subject vocabulary stored in a TSV file.""" |
||
| 12 | |||
| 13 | def __init__(self, path): |
||
| 14 | self.path = path |
||
| 15 | |||
| 16 | def _parse_line(self, line): |
||
| 17 | vals = line.strip().split('\t', 2) |
||
| 18 | clean_uri = annif.util.cleanup_uri(vals[0]) |
||
| 19 | label = vals[1] if len(vals) >= 2 else None |
||
| 20 | notation = vals[2] if len(vals) >= 3 else None |
||
| 21 | yield Subject(uri=clean_uri, label=label, notation=notation, text=None) |
||
| 22 | |||
| 23 | @property |
||
| 24 | def languages(self): |
||
| 25 | # we don't have information about the language(s) of labels |
||
| 26 | return None |
||
| 27 | |||
| 28 | def subjects(self, language): |
||
| 29 | with open(self.path, encoding='utf-8-sig') as subjfile: |
||
| 30 | for line in subjfile: |
||
| 31 | yield from self._parse_line(line) |
||
| 32 | |||
| 33 | def save_skos(self, path, language): |
||
| 34 | """Save the contents of the subject vocabulary into a SKOS/Turtle |
||
| 35 | file with the given path name.""" |
||
| 36 | serialize_subjects_to_skos(self.subjects(language), language, path) |
||
| 37 | |||
| 38 | |||
| 39 | View Code Duplication | class SubjectIndex: |
|
| 40 | """An index that remembers the associations between integers subject IDs |
||
| 41 | and their URIs and labels.""" |
||
| 42 | |||
| 43 | def __init__(self): |
||
| 44 | self._uris = [] |
||
| 45 | self._labels = [] |
||
| 46 | self._notations = [] |
||
| 47 | self._uri_idx = {} |
||
| 48 | self._label_idx = {} |
||
| 49 | |||
| 50 | def load_subjects(self, corpus, language): |
||
| 51 | """Initialize the subject index from a subject corpus using labels |
||
| 52 | in the given language.""" |
||
| 53 | |||
| 54 | for subject_id, subject in enumerate(corpus.subjects(language)): |
||
| 55 | self._append(subject_id, subject.uri, subject.label, |
||
| 56 | subject.notation) |
||
| 57 | |||
| 58 | def __len__(self): |
||
| 59 | return len(self._uris) |
||
| 60 | |||
| 61 | def __getitem__(self, subject_id): |
||
| 62 | return (self._uris[subject_id], self._labels[subject_id], |
||
| 63 | self._notations[subject_id]) |
||
| 64 | |||
| 65 | def _append(self, subject_id, uri, label, notation): |
||
| 66 | self._uris.append(uri) |
||
| 67 | self._labels.append(label) |
||
| 68 | self._notations.append(notation) |
||
| 69 | self._uri_idx[uri] = subject_id |
||
| 70 | self._label_idx[label] = subject_id |
||
| 71 | |||
| 72 | def append(self, uri, label, notation): |
||
| 73 | subject_id = len(self._uris) |
||
| 74 | self._append(subject_id, uri, label, notation) |
||
| 75 | |||
| 76 | def contains_uri(self, uri): |
||
| 77 | return uri in self._uri_idx |
||
| 78 | |||
| 79 | def by_uri(self, uri, warnings=True): |
||
| 80 | """return the subject index of a subject by its URI, or None if not found. |
||
| 81 | If warnings=True, log a warning message if the URI cannot be found.""" |
||
| 82 | try: |
||
| 83 | return self._uri_idx[uri] |
||
| 84 | except KeyError: |
||
| 85 | if warnings: |
||
| 86 | logger.warning('Unknown subject URI <%s>', uri) |
||
| 87 | return None |
||
| 88 | |||
| 89 | def by_label(self, label): |
||
| 90 | """return the subject index of a subject by its label""" |
||
| 91 | try: |
||
| 92 | return self._label_idx[label] |
||
| 93 | except KeyError: |
||
| 94 | logger.warning('Unknown subject label "%s"', label) |
||
| 95 | return None |
||
| 96 | |||
| 97 | def uris_to_labels(self, uris): |
||
| 98 | """return a list of labels corresponding to the given URIs; unknown |
||
| 99 | URIs are ignored""" |
||
| 100 | |||
| 101 | return [self[subject_id][1] |
||
| 102 | for subject_id in (self.by_uri(uri) for uri in uris) |
||
| 103 | if subject_id is not None] |
||
| 104 | |||
| 105 | def labels_to_uris(self, labels): |
||
| 106 | """return a list of URIs corresponding to the given labels; unknown |
||
| 107 | labels are ignored""" |
||
| 108 | |||
| 109 | return [self[subject_id][0] |
||
| 110 | for subject_id in (self.by_label(label) for label in labels) |
||
| 111 | if subject_id is not None] |
||
| 112 | |||
| 113 | def deprecated_ids(self): |
||
| 114 | """return indices of deprecated subjects""" |
||
| 115 | |||
| 116 | return [subject_id for subject_id, label in enumerate(self._labels) |
||
| 117 | if label is None] |
||
| 118 | |||
| 119 | @property |
||
| 120 | def active(self): |
||
| 121 | """return a list of (subject_id, uri, label, notation) tuples of all |
||
| 122 | subjects that are not deprecated""" |
||
| 123 | |||
| 124 | return [(subj_id, uri, label, notation) |
||
| 125 | for subj_id, (uri, label, notation) |
||
| 126 | in enumerate(zip(self._uris, self._labels, self._notations)) |
||
| 127 | if label is not None] |
||
| 128 | |||
| 129 | def save(self, path): |
||
| 130 | """Save this subject index into a file.""" |
||
| 131 | |||
| 132 | with open(path, 'w', encoding='utf-8') as subjfile: |
||
| 133 | for uri, label, notation in self: |
||
| 134 | line = "<{}>".format(uri) |
||
| 135 | if label is not None: |
||
| 136 | line += ('\t' + label) |
||
| 137 | if notation is not None: |
||
| 138 | line += ('\t' + notation) |
||
| 139 | print(line, file=subjfile) |
||
| 140 | |||
| 141 | @classmethod |
||
| 142 | def load(cls, path): |
||
| 143 | """Load a subject index from a TSV file and return it.""" |
||
| 144 | |||
| 145 | corpus = SubjectFileTSV(path) |
||
| 146 | subject_index = cls() |
||
| 147 | subject_index.load_subjects(corpus, None) |
||
| 148 | return subject_index |
||
| 149 | |||
| 150 | |||
| 151 | View Code Duplication | class SubjectSet: |
|
| 152 | """Represents a set of subjects for a document.""" |
||
| 153 | |||
| 154 | def __init__(self, subj_data=None): |
||
| 155 | """Create a SubjectSet and optionally initialize it from a tuple |
||
| 156 | (URIs, labels)""" |
||
| 157 | |||
| 158 | uris, labels = subj_data or ([], []) |
||
| 159 | self.subject_uris = set(uris) |
||
| 160 | self.subject_labels = set(labels) |
||
| 161 | |||
| 162 | @classmethod |
||
| 163 | def from_string(cls, subj_data): |
||
| 164 | sset = cls() |
||
| 165 | for line in subj_data.splitlines(): |
||
| 166 | sset._parse_line(line) |
||
| 167 | return sset |
||
| 168 | |||
| 169 | def _parse_line(self, line): |
||
| 170 | vals = line.split("\t") |
||
| 171 | for val in vals: |
||
| 172 | val = val.strip() |
||
| 173 | if val == '': |
||
| 174 | continue |
||
| 175 | if val.startswith('<') and val.endswith('>'): # URI |
||
| 176 | self.subject_uris.add(val[1:-1]) |
||
| 177 | continue |
||
| 178 | self.subject_labels.add(val) |
||
| 179 | return |
||
| 180 | |||
| 181 | def has_uris(self): |
||
| 182 | """returns True if the URIs for all subjects are known""" |
||
| 183 | return len(self.subject_uris) >= len(self.subject_labels) |
||
| 184 | |||
| 185 | def as_vector(self, subject_index, destination=None, warnings=True): |
||
| 186 | """Return the hits as a one-dimensional NumPy array in sklearn |
||
| 187 | multilabel indicator format, using a subject index as the source |
||
| 188 | of subjects. Use destination array if given (not None), otherwise |
||
| 189 | create and return a new one. If warnings=True, log warnings for |
||
| 190 | unknown URIs.""" |
||
| 191 | |||
| 192 | if destination is None: |
||
| 193 | destination = np.zeros(len(subject_index), dtype=bool) |
||
| 194 | |||
| 195 | if self.has_uris(): |
||
| 196 | for uri in self.subject_uris: |
||
| 197 | subject_id = subject_index.by_uri( |
||
| 198 | uri, warnings=warnings) |
||
| 199 | if subject_id is not None: |
||
| 200 | destination[subject_id] = True |
||
| 201 | else: |
||
| 202 | for label in self.subject_labels: |
||
| 203 | subject_id = subject_index.by_label(label) |
||
| 204 | if subject_id is not None: |
||
| 205 | destination[subject_id] = True |
||
| 206 | return destination |
||
| 207 |