Total Complexity | 62 |
Total Lines | 271 |
Duplicated Lines | 93.36 % |
Changes | 0 |
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like annif.corpus.subject often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | """Classes for supporting subject corpora expressed as directories or files""" |
||
2 | |||
3 | import csv |
||
4 | import numpy as np |
||
5 | import annif.util |
||
6 | import os.path |
||
7 | from annif import logger |
||
8 | from .types import Subject, SubjectCorpus |
||
9 | from .skos import serialize_subjects_to_skos |
||
10 | |||
11 | |||
12 | View Code Duplication | class SubjectFileTSV(SubjectCorpus): |
|
|
|||
13 | """A monolingual subject vocabulary stored in a TSV file.""" |
||
14 | |||
15 | def __init__(self, path, language): |
||
16 | """initialize the SubjectFileTSV given a path to a TSV file and the |
||
17 | language of the vocabulary""" |
||
18 | |||
19 | self.path = path |
||
20 | self.language = language |
||
21 | |||
22 | def _parse_line(self, line): |
||
23 | vals = line.strip().split('\t', 2) |
||
24 | clean_uri = annif.util.cleanup_uri(vals[0]) |
||
25 | label = vals[1] if len(vals) >= 2 else None |
||
26 | labels = {self.language: label} if label else None |
||
27 | notation = vals[2] if len(vals) >= 3 else None |
||
28 | yield Subject(uri=clean_uri, |
||
29 | labels=labels, |
||
30 | notation=notation) |
||
31 | |||
32 | @property |
||
33 | def languages(self): |
||
34 | return [self.language] |
||
35 | |||
36 | @property |
||
37 | def subjects(self): |
||
38 | with open(self.path, encoding='utf-8-sig') as subjfile: |
||
39 | for line in subjfile: |
||
40 | yield from self._parse_line(line) |
||
41 | |||
42 | def save_skos(self, path): |
||
43 | """Save the contents of the subject vocabulary into a SKOS/Turtle |
||
44 | file with the given path name.""" |
||
45 | serialize_subjects_to_skos(self.subjects, path) |
||
46 | |||
47 | |||
48 | View Code Duplication | class SubjectFileCSV(SubjectCorpus): |
|
49 | """A multilingual subject vocabulary stored in a CSV file.""" |
||
50 | |||
51 | def __init__(self, path): |
||
52 | """initialize the SubjectFileCSV given a path to a CSV file""" |
||
53 | self.path = path |
||
54 | |||
55 | def _parse_row(self, row): |
||
56 | labels = { |
||
57 | fname.replace('label_', ''): value or None |
||
58 | for fname, value in row.items() |
||
59 | if fname.startswith('label_') |
||
60 | } |
||
61 | |||
62 | # if there are no labels in any language, set labels to None |
||
63 | # indicating a deprecated subject |
||
64 | if set(labels.values()) == {None}: |
||
65 | labels = None |
||
66 | |||
67 | yield Subject(uri=annif.util.cleanup_uri(row['uri']), |
||
68 | labels=labels, |
||
69 | notation=row.get('notation', None) or None) |
||
70 | |||
71 | @property |
||
72 | def languages(self): |
||
73 | # infer the supported languages from the CSV column names |
||
74 | with open(self.path, encoding='utf-8-sig') as csvfile: |
||
75 | reader = csv.reader(csvfile) |
||
76 | fieldnames = next(reader, None) |
||
77 | |||
78 | return [fname.replace('label_', '') |
||
79 | for fname in fieldnames |
||
80 | if fname.startswith('label_')] |
||
81 | |||
82 | @property |
||
83 | def subjects(self): |
||
84 | with open(self.path, encoding='utf-8-sig') as csvfile: |
||
85 | reader = csv.DictReader(csvfile) |
||
86 | for row in reader: |
||
87 | yield from self._parse_row(row) |
||
88 | |||
89 | def save_skos(self, path): |
||
90 | """Save the contents of the subject vocabulary into a SKOS/Turtle |
||
91 | file with the given path name.""" |
||
92 | serialize_subjects_to_skos(self.subjects, path) |
||
93 | |||
94 | @staticmethod |
||
95 | def is_csv_file(path): |
||
96 | """return True if the path looks like a CSV file""" |
||
97 | |||
98 | return os.path.splitext(path)[1].lower() == '.csv' |
||
99 | |||
100 | |||
101 | View Code Duplication | class SubjectIndex: |
|
102 | """An index that remembers the associations between integers subject IDs |
||
103 | and their URIs and labels.""" |
||
104 | |||
105 | def __init__(self): |
||
106 | self._subjects = [] |
||
107 | self._uri_idx = {} |
||
108 | self._label_idx = {} |
||
109 | self._languages = None |
||
110 | |||
111 | def load_subjects(self, corpus): |
||
112 | """Initialize the subject index from a subject corpus""" |
||
113 | |||
114 | self._languages = corpus.languages |
||
115 | for subject in corpus.subjects: |
||
116 | self.append(subject) |
||
117 | |||
118 | def __len__(self): |
||
119 | return len(self._subjects) |
||
120 | |||
121 | @property |
||
122 | def languages(self): |
||
123 | return self._languages |
||
124 | |||
125 | def __getitem__(self, subject_id): |
||
126 | return self._subjects[subject_id] |
||
127 | |||
128 | def append(self, subject): |
||
129 | if self._languages is None and subject.labels is not None: |
||
130 | self._languages = list(subject.labels.keys()) |
||
131 | |||
132 | subject_id = len(self._subjects) |
||
133 | self._uri_idx[subject.uri] = subject_id |
||
134 | if subject.labels: |
||
135 | for lang, label in subject.labels.items(): |
||
136 | self._label_idx[(label, lang)] = subject_id |
||
137 | self._subjects.append(subject) |
||
138 | |||
139 | def contains_uri(self, uri): |
||
140 | return uri in self._uri_idx |
||
141 | |||
142 | def by_uri(self, uri, warnings=True): |
||
143 | """return the subject ID of a subject by its URI, or None if not found. |
||
144 | If warnings=True, log a warning message if the URI cannot be found.""" |
||
145 | try: |
||
146 | return self._uri_idx[uri] |
||
147 | except KeyError: |
||
148 | if warnings: |
||
149 | logger.warning('Unknown subject URI <%s>', uri) |
||
150 | return None |
||
151 | |||
152 | def by_label(self, label, language): |
||
153 | """return the subject ID of a subject by its label in a given |
||
154 | language""" |
||
155 | try: |
||
156 | return self._label_idx[(label, language)] |
||
157 | except KeyError: |
||
158 | logger.warning('Unknown subject label "%s"@%s', label, language) |
||
159 | return None |
||
160 | |||
161 | def deprecated_ids(self): |
||
162 | """return indices of deprecated subjects""" |
||
163 | |||
164 | return [subject_id for subject_id, subject in enumerate(self._subjects) |
||
165 | if subject.labels is None] |
||
166 | |||
167 | @property |
||
168 | def active(self): |
||
169 | """return a list of (subject_id, subject) tuples of all subjects that |
||
170 | are not deprecated""" |
||
171 | |||
172 | return [(subj_id, subject) |
||
173 | for subj_id, subject |
||
174 | in enumerate(self._subjects) |
||
175 | if subject.labels is not None] |
||
176 | |||
177 | def save(self, path): |
||
178 | """Save this subject index into a file with the given path name.""" |
||
179 | |||
180 | fieldnames = ['uri', 'notation'] + \ |
||
181 | [f'label_{lang}' for lang in self._languages] |
||
182 | |||
183 | with open(path, 'w', encoding='utf-8', newline='') as csvfile: |
||
184 | writer = csv.DictWriter(csvfile, fieldnames=fieldnames) |
||
185 | writer.writeheader() |
||
186 | for subject in self: |
||
187 | row = {'uri': subject.uri, |
||
188 | 'notation': subject.notation or ''} |
||
189 | if subject.labels: |
||
190 | for lang, label in subject.labels.items(): |
||
191 | row[f'label_{lang}'] = label |
||
192 | writer.writerow(row) |
||
193 | |||
194 | @classmethod |
||
195 | def load(cls, path): |
||
196 | """Load a subject index from a CSV file and return it.""" |
||
197 | |||
198 | corpus = SubjectFileCSV(path) |
||
199 | subject_index = cls() |
||
200 | subject_index.load_subjects(corpus) |
||
201 | return subject_index |
||
202 | |||
203 | |||
204 | View Code Duplication | class SubjectSet: |
|
205 | """Represents a set of subjects for a document.""" |
||
206 | |||
207 | def __init__(self, subject_ids=None): |
||
208 | """Create a SubjectSet and optionally initialize it from an iterable |
||
209 | of subject IDs""" |
||
210 | |||
211 | if subject_ids: |
||
212 | # use set comprehension to eliminate possible duplicates |
||
213 | self._subject_ids = list({subject_id |
||
214 | for subject_id in subject_ids |
||
215 | if subject_id is not None}) |
||
216 | else: |
||
217 | self._subject_ids = [] |
||
218 | |||
219 | def __len__(self): |
||
220 | return len(self._subject_ids) |
||
221 | |||
222 | def __getitem__(self, idx): |
||
223 | return self._subject_ids[idx] |
||
224 | |||
225 | def __bool__(self): |
||
226 | return bool(self._subject_ids) |
||
227 | |||
228 | def __eq__(self, other): |
||
229 | if isinstance(other, SubjectSet): |
||
230 | return self._subject_ids == other._subject_ids |
||
231 | |||
232 | return False |
||
233 | |||
234 | @classmethod |
||
235 | def from_string(cls, subj_data, subject_index, language): |
||
236 | subject_ids = set() |
||
237 | for line in subj_data.splitlines(): |
||
238 | uri, label = cls._parse_line(line) |
||
239 | if uri is not None: |
||
240 | subject_ids.add(subject_index.by_uri(uri)) |
||
241 | else: |
||
242 | subject_ids.add(subject_index.by_label(label, language)) |
||
243 | return cls(subject_ids) |
||
244 | |||
245 | @staticmethod |
||
246 | def _parse_line(line): |
||
247 | uri = label = None |
||
248 | vals = line.split("\t") |
||
249 | for val in vals: |
||
250 | val = val.strip() |
||
251 | if val == '': |
||
252 | continue |
||
253 | if val.startswith('<') and val.endswith('>'): # URI |
||
254 | uri = val[1:-1] |
||
255 | continue |
||
256 | label = val |
||
257 | break |
||
258 | return uri, label |
||
259 | |||
260 | def as_vector(self, size=None, destination=None): |
||
261 | """Return the hits as a one-dimensional NumPy array in sklearn |
||
262 | multilabel indicator format. Use destination array if given (not |
||
263 | None), otherwise create and return a new one of the given size.""" |
||
264 | |||
265 | if destination is None: |
||
266 | destination = np.zeros(size, dtype=bool) |
||
267 | |||
268 | destination[list(self._subject_ids)] = True |
||
269 | |||
270 | return destination |
||
271 |