| Total Complexity | 56 |
| Total Lines | 213 |
| Duplicated Lines | 16.9 % |
| Changes | 24 | ||
| Bugs | 2 | Features | 3 |
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like ProviderView often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
| 1 | # -*- coding: utf8 -*- |
||
| 35 | @view_defaults(renderer='skosjson', accept='application/json') |
||
| 36 | class ProviderView(RestView): |
||
| 37 | ''' |
||
| 38 | A set of views that expose information from a certain provider. |
||
| 39 | ''' |
||
| 40 | |||
| 41 | @view_config(route_name='skosprovider.uri', request_method='GET') |
||
| 42 | @view_config(route_name='skosprovider.uri.deprecated', request_method='GET') |
||
| 43 | def get_uri(self): |
||
| 44 | uri = self.request.params.get('uri', self.request.matchdict.get('uri', None)) |
||
| 45 | if not uri: |
||
| 46 | return HTTPBadRequest() |
||
| 47 | provider = self.skos_registry.get_provider(uri) |
||
| 48 | if provider: |
||
| 49 | return { |
||
| 50 | 'type': 'concept_scheme', |
||
| 51 | 'uri': provider.concept_scheme.uri, |
||
| 52 | 'id': provider.get_vocabulary_id() |
||
| 53 | } |
||
| 54 | c = self.skos_registry.get_by_uri(uri) |
||
| 55 | if not c: |
||
| 56 | return HTTPNotFound() |
||
| 57 | return { |
||
| 58 | 'type': c.type, |
||
| 59 | 'uri': c.uri, |
||
| 60 | 'id': c.id, |
||
| 61 | 'concept_scheme': { |
||
| 62 | 'uri': c.concept_scheme.uri, |
||
| 63 | 'id': self.skos_registry.get_provider(c.concept_scheme.uri).get_vocabulary_id() |
||
| 64 | } |
||
| 65 | } |
||
| 66 | |||
| 67 | @view_config(route_name='skosprovider.conceptschemes', request_method='GET') |
||
| 68 | def get_conceptschemes(self): |
||
| 69 | language = self.request.params.get('language', self.request.locale_name) |
||
| 70 | return [ |
||
| 71 | { |
||
| 72 | 'id': p.get_vocabulary_id(), |
||
| 73 | 'uri': p.concept_scheme.uri, |
||
| 74 | 'label': p.concept_scheme.label(language).label if p.concept_scheme.label(language) else None, |
||
| 75 | 'subject': p.metadata['subject'] if p.metadata['subject'] else [] |
||
| 76 | } for p in self.skos_registry.get_providers() |
||
| 77 | ] |
||
| 78 | |||
| 79 | @view_config(route_name='skosprovider.conceptscheme', request_method='GET') |
||
| 80 | def get_conceptscheme(self): |
||
| 81 | scheme_id = self.request.matchdict['scheme_id'] |
||
| 82 | provider = self.skos_registry.get_provider(scheme_id) |
||
| 83 | if not provider: |
||
| 84 | return HTTPNotFound() |
||
| 85 | language = self.request.params.get('language', self.request.locale_name) |
||
| 86 | return { |
||
| 87 | 'id': provider.get_vocabulary_id(), |
||
| 88 | 'uri': provider.concept_scheme.uri, |
||
| 89 | 'label': provider.concept_scheme.label(language).label if provider.concept_scheme.label(language) else None, |
||
| 90 | 'subject': provider.metadata['subject'] if provider.metadata['subject'] else [], |
||
| 91 | 'labels': provider.concept_scheme.labels, |
||
| 92 | 'notes': provider.concept_scheme.notes, |
||
| 93 | 'sources': provider.concept_scheme.sources, |
||
| 94 | 'languages': provider.concept_scheme.languages |
||
| 95 | } |
||
| 96 | |||
| 97 | @view_config(route_name='skosprovider.conceptscheme.tc', request_method='GET') |
||
| 98 | def get_conceptscheme_top_concepts(self): |
||
| 99 | scheme_id = self.request.matchdict['scheme_id'] |
||
| 100 | provider = self.skos_registry.get_provider(scheme_id) |
||
| 101 | if not provider: |
||
| 102 | return HTTPNotFound() |
||
| 103 | language = self.request.params.get('language', self.request.locale_name) |
||
| 104 | return provider.get_top_concepts(language=language) |
||
| 105 | |||
| 106 | @view_config(route_name='skosprovider.conceptscheme.display_top', request_method='GET') |
||
| 107 | def get_conceptscheme_display_top(self): |
||
| 108 | scheme_id = self.request.matchdict['scheme_id'] |
||
| 109 | provider = self.skos_registry.get_provider(scheme_id) |
||
| 110 | if not provider: |
||
| 111 | return HTTPNotFound() |
||
| 112 | language = self.request.params.get('language', self.request.locale_name) |
||
| 113 | return provider.get_top_display(language=language) |
||
| 114 | |||
| 115 | def _build_providers(self, request): |
||
| 116 | ''' |
||
| 117 | :param pyramid.request.Request request: |
||
| 118 | :rtype: :class:`dict` |
||
| 119 | ''' |
||
| 120 | # determine targets |
||
| 121 | providers = {} |
||
| 122 | ids = request.params.get('providers.ids', None) |
||
| 123 | if ids: |
||
| 124 | ids = ids.split(',') |
||
| 125 | providers['ids'] = ids |
||
| 126 | subject = self.request.params.get('providers.subject', None) |
||
| 127 | if subject: |
||
| 128 | providers['subject'] = subject |
||
| 129 | return providers |
||
| 130 | |||
| 131 | View Code Duplication | @view_config(route_name='skosprovider.cs', request_method='GET') |
|
|
|
|||
| 132 | def get_concepts(self): |
||
| 133 | qb = QueryBuilder(self.request) |
||
| 134 | query = qb() |
||
| 135 | if qb.no_result: |
||
| 136 | concepts = [] |
||
| 137 | else: |
||
| 138 | concepts = self.skos_registry.find( |
||
| 139 | query, |
||
| 140 | providers=self._build_providers(self.request), |
||
| 141 | language=qb.language |
||
| 142 | ) |
||
| 143 | # Flatten it all |
||
| 144 | concepts = list(itertools.chain.from_iterable([c['concepts'] for c in concepts])) |
||
| 145 | |||
| 146 | if qb.postprocess: |
||
| 147 | concepts = self._postprocess_wildcards(concepts, qb.label) |
||
| 148 | |||
| 149 | return self._page_results(self._sort_concepts(concepts)) |
||
| 150 | |||
| 151 | View Code Duplication | @view_config(route_name='skosprovider.conceptscheme.cs', request_method='GET') |
|
| 152 | def get_conceptscheme_concepts(self): |
||
| 153 | scheme_id = self.request.matchdict['scheme_id'] |
||
| 154 | provider = self.skos_registry.get_provider(scheme_id) |
||
| 155 | if not provider: |
||
| 156 | return HTTPNotFound() |
||
| 157 | qb = QueryBuilder(self.request) |
||
| 158 | query = qb() |
||
| 159 | if qb.no_result: |
||
| 160 | concepts = [] |
||
| 161 | else: |
||
| 162 | concepts = provider.find(query, language=qb.language) |
||
| 163 | |||
| 164 | if qb.postprocess: |
||
| 165 | concepts = self._postprocess_wildcards(concepts, qb.label) |
||
| 166 | |||
| 167 | return self._page_results(self._sort_concepts(concepts)) |
||
| 168 | |||
| 169 | def _postprocess_wildcards(self, concepts, label): |
||
| 170 | # We need to refine results further |
||
| 171 | if label.startswith('*') and label.endswith('*'): |
||
| 172 | concepts = [c for c in concepts if label[1:-1] in c['label']] |
||
| 173 | elif label.endswith('*'): |
||
| 174 | concepts = [c for c in concepts if c['label'].startswith(label[0:-1])] |
||
| 175 | elif label.startswith('*'): |
||
| 176 | concepts = [c for c in concepts if c['label'].endswith(label[1:])] |
||
| 177 | return concepts |
||
| 178 | |||
| 179 | def _sort_concepts(self, concepts): |
||
| 180 | sort = self.request.params.get('sort', None) |
||
| 181 | # Result sorting |
||
| 182 | if sort: |
||
| 183 | sort_desc = (sort[0:1] == '-') |
||
| 184 | sort = sort[1:] if sort[0:1] in ['-', '+'] else sort |
||
| 185 | sort = sort.strip() # dojo store does not encode '+' |
||
| 186 | if (len(concepts) > 0) and (sort in concepts[0]): |
||
| 187 | if sort == 'label': |
||
| 188 | concepts.sort( |
||
| 189 | key=lambda concept: concept[sort].lower(), |
||
| 190 | reverse=sort_desc |
||
| 191 | ) |
||
| 192 | else: |
||
| 193 | concepts.sort( |
||
| 194 | key=lambda concept: concept[sort], |
||
| 195 | reverse=sort_desc |
||
| 196 | ) |
||
| 197 | return concepts |
||
| 198 | |||
| 199 | def _page_results(self, concepts): |
||
| 200 | # Result paging |
||
| 201 | paging_data = False |
||
| 202 | if 'Range' in self.request.headers: |
||
| 203 | paging_data = parse_range_header(self.request.headers['Range']) |
||
| 204 | count = len(concepts) |
||
| 205 | if not paging_data: |
||
| 206 | paging_data = { |
||
| 207 | 'start': 0, |
||
| 208 | 'finish': count - 1 if count > 0 else 0, |
||
| 209 | 'number': count |
||
| 210 | } |
||
| 211 | cslice = concepts[paging_data['start']:paging_data['finish']+1] |
||
| 212 | self.request.response.headers[ascii_native_('Content-Range')] = \ |
||
| 213 | ascii_native_('items %d-%d/%d' % ( |
||
| 214 | paging_data['start'], paging_data['finish'], count |
||
| 215 | )) |
||
| 216 | return cslice |
||
| 217 | |||
| 218 | @view_config(route_name='skosprovider.c', request_method='GET') |
||
| 219 | def get_concept(self): |
||
| 220 | scheme_id = self.request.matchdict['scheme_id'] |
||
| 221 | concept_id = self.request.matchdict['c_id'] |
||
| 222 | provider = self.skos_registry.get_provider(scheme_id) |
||
| 223 | concept = provider.get_by_id(concept_id) |
||
| 224 | if not concept: |
||
| 225 | return HTTPNotFound() |
||
| 226 | return concept |
||
| 227 | |||
| 228 | @view_config(route_name='skosprovider.c.display_children', request_method='GET') |
||
| 229 | def get_concept_display_children(self): |
||
| 230 | scheme_id = self.request.matchdict['scheme_id'] |
||
| 231 | concept_id = self.request.matchdict['c_id'] |
||
| 232 | provider = self.skos_registry.get_provider(scheme_id) |
||
| 233 | language = self.request.params.get('language', self.request.locale_name) |
||
| 234 | children = provider.get_children_display(concept_id, language=language) |
||
| 235 | if children is False: |
||
| 236 | return HTTPNotFound() |
||
| 237 | return children |
||
| 238 | |||
| 239 | @view_config(route_name='skosprovider.c.expand', request_method='GET') |
||
| 240 | def get_expand(self): |
||
| 241 | scheme_id = self.request.matchdict['scheme_id'] |
||
| 242 | concept_id = self.request.matchdict['c_id'] |
||
| 243 | provider = self.skos_registry.get_provider(scheme_id) |
||
| 244 | expanded = provider.expand(concept_id) |
||
| 245 | if not expanded: |
||
| 246 | return HTTPNotFound() |
||
| 247 | return expanded |
||
| 248 |