Total Complexity | 57 |
Total Lines | 196 |
Duplicated Lines | 18.88 % |
Changes | 1 | ||
Bugs | 0 | Features | 0 |
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like EdgesRelatives often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | """Creates and manages edges from one GO term to another GO term.""" |
||
128 | class EdgesRelatives(EdgesBase): |
||
129 | """Inits GO-to-GO edges using all relatives above and/or below source GOs.""" |
||
130 | |||
131 | # pylint: disable=too-many-arguments |
||
132 | # def __init__(self, go2obj, relationships, go_sources, traverse_parent, traverse_child): |
||
133 | def __init__(self, gosubdag, traverse_parent, traverse_child): |
||
134 | super(EdgesRelatives, self).__init__(gosubdag) |
||
135 | # go2obj contain GO IDs in subset |
||
136 | _gos = set(gosubdag.go2obj) |
||
137 | assert traverse_child or traverse_parent, "NO EDGES IN GRAPH" |
||
138 | # GO IDs for child->parents |
||
139 | p2cs = self._init_p2cs(_gos, traverse_parent) |
||
140 | # GO IDs for parent->children |
||
141 | c2ps = self._init_c2ps(gosubdag.go_sources, traverse_child) |
||
142 | # GO IDs for GO->relationship |
||
143 | rel2src2dsts = self._init_rel2src2dsts(_gos, traverse_parent) |
||
144 | rel2dst2srcs = self._init_rel2dst2srcs(_gos, traverse_child) |
||
145 | # Set by derived edge class |
||
146 | # self.edges = self._init_edges(_gos, p2cs, c2ps) |
||
147 | self.edges = self._init_edges(p2cs, c2ps) |
||
148 | self.edges_rel = self._init_edges_relationships(rel2src2dsts, rel2dst2srcs) |
||
149 | assert _gos == set(self.go2obj) |
||
150 | # self.chk_edges() |
||
151 | |||
152 | @staticmethod |
||
153 | # Too slow to check goids_present as we go. Only minor init modes need checking. |
||
154 | # def _init_edges(goids_present, p2cs, c2ps): |
||
155 | def _init_edges(p2cs, c2ps): |
||
156 | """Get the directed edges from GO term to GO term.""" |
||
157 | edge_from_to = [] |
||
158 | for parent, children in p2cs.items(): |
||
159 | for child in children: |
||
160 | # if child in goids_present and parent in goids_present: |
||
161 | edge_from_to.append((child, parent)) |
||
162 | for parent, children in c2ps.items(): |
||
163 | for child in children: |
||
164 | # if child in goids_present and parent in goids_present: |
||
165 | edge_from_to.append((child, parent)) |
||
166 | return edge_from_to |
||
167 | |||
168 | @staticmethod |
||
169 | def _init_edges_relationships(rel2src2dsts, rel2dst2srcs): |
||
170 | """Get the directed edges from GO term to GO term using relationships.""" |
||
171 | edge_rel2fromto = {} |
||
172 | relationships = set(rel2src2dsts).union(rel2dst2srcs) |
||
173 | for reltype in relationships: |
||
174 | edge_from_to = [] |
||
175 | if reltype in rel2src2dsts: |
||
176 | for parent, children in rel2src2dsts[reltype].items(): |
||
177 | for child in children: |
||
178 | edge_from_to.append((child, parent)) |
||
179 | if reltype in rel2dst2srcs: |
||
180 | for parent, children in rel2dst2srcs[reltype].items(): |
||
181 | for child in children: |
||
182 | edge_from_to.append((child, parent)) |
||
183 | edge_rel2fromto[reltype] = edge_from_to |
||
184 | return edge_rel2fromto |
||
185 | |||
186 | # ------------------------------------------------------------------- |
||
187 | def _init_rel2src2dsts(self, go_sources, traverse_parent): |
||
188 | """Traverse up parents.""" |
||
189 | if not traverse_parent or not self.relationships: |
||
190 | return {} |
||
191 | rel2src2dsts = {r:defaultdict(set) for r in self.relationships} |
||
192 | goids_seen = set() |
||
193 | go2obj = self.go2obj |
||
194 | for goid_src in go_sources: |
||
195 | goobj_src = go2obj[goid_src] |
||
196 | if goobj_src.relationship and goid_src not in goids_seen: |
||
197 | self._traverse_relationship_objs(rel2src2dsts, goobj_src, goids_seen) |
||
198 | return rel2src2dsts |
||
199 | |||
200 | View Code Duplication | def _traverse_relationship_objs(self, rel2src2dsts, goobj_child, goids_seen): |
|
|
|||
201 | """Traverse from source GO up relationships.""" |
||
202 | child_id = goobj_child.id |
||
203 | goids_seen.add(child_id) |
||
204 | ##A self.go2obj[child_id] = goobj_child |
||
205 | # Update goids_seen and go2obj with child alt_ids |
||
206 | for goid_altid in goobj_child.alt_ids: |
||
207 | goids_seen.add(goid_altid) |
||
208 | ##A self.go2obj[goid_altid] = goobj_child |
||
209 | # Loop through relationships of child object |
||
210 | for reltype, recs in goobj_child.relationship.items(): |
||
211 | if reltype in self.relationships: |
||
212 | for relationship_obj in recs: |
||
213 | relationship_id = relationship_obj.id |
||
214 | rel2src2dsts[reltype][relationship_id].add(child_id) |
||
215 | # If relationship has not been seen, traverse |
||
216 | if relationship_id not in goids_seen: |
||
217 | self._traverse_relationship_objs(rel2src2dsts, relationship_obj, goids_seen) |
||
218 | |||
219 | # ------------------------------------------------------------------- |
||
220 | def _init_rel2dst2srcs(self, go_sources, traverse_child): |
||
221 | """Traverse through reverse relationships.""" |
||
222 | if not traverse_child or not self.relationships: |
||
223 | return {} |
||
224 | rel2dst2srcs = {r:defaultdict(set) for r in self.relationships} |
||
225 | goids_seen = set() |
||
226 | go2obj = self.go2obj |
||
227 | for goid_src in go_sources: |
||
228 | goobj_src = go2obj[goid_src] |
||
229 | if goid_src not in goids_seen: |
||
230 | self._traverse_relationship_rev_objs(rel2dst2srcs, goobj_src, goids_seen) |
||
231 | return rel2dst2srcs |
||
232 | |||
233 | View Code Duplication | def _traverse_relationship_rev_objs(self, rel2dst2srcs, goobj_parent, goids_seen): |
|
234 | """Traverse from source GO down children.""" |
||
235 | parent_id = goobj_parent.id |
||
236 | goids_seen.add(parent_id) |
||
237 | ##A self.go2obj[parent_id] = goobj_parent |
||
238 | # Update goids_seen and go2obj with parent alt_ids |
||
239 | for goid_altid in goobj_parent.alt_ids: |
||
240 | goids_seen.add(goid_altid) |
||
241 | ##A self.go2obj[goid_altid] = goobj_parent |
||
242 | # Loop through children |
||
243 | for reltype, recs in goobj_parent.relationship.items(): |
||
244 | if reltype in self.relationships: |
||
245 | for relrev_obj in recs: |
||
246 | relrev_id = relrev_obj.id |
||
247 | rel2dst2srcs[relrev_id].add(parent_id) |
||
248 | # If child has not been seen, traverse |
||
249 | if relrev_id not in goids_seen: |
||
250 | ##F self._traverse_relrev_objs(rel2dst2srcs, relrev_obj, go2obj, goids_seen) |
||
251 | self._traverse_relationship_rev_objs(rel2dst2srcs, relrev_obj, goids_seen) |
||
252 | |||
253 | # ------------------------------------------------------------------- |
||
254 | def _init_p2cs(self, go_sources, traverse_parent): |
||
255 | """Traverse up parents.""" |
||
256 | if not traverse_parent: |
||
257 | return {} |
||
258 | p2cs = defaultdict(set) |
||
259 | goids_seen = set() |
||
260 | go2obj = self.go2obj |
||
261 | for goid_src in go_sources: |
||
262 | goobj_src = go2obj[goid_src] |
||
263 | if goid_src not in goids_seen: |
||
264 | ##F self._traverse_parent_objs(p2cs, goobj_src, go2obj, goids_seen) |
||
265 | self._traverse_parent_objs(p2cs, goobj_src, goids_seen) |
||
266 | return p2cs |
||
267 | |||
268 | ##F def _traverse_parent_objs(self, p2cs, goobj_child, go2obj, goids_seen): |
||
269 | def _traverse_parent_objs(self, p2cs, goobj_child, goids_seen): |
||
270 | """Traverse from source GO up parents.""" |
||
271 | # Update public(go2obj p2cs), private(goids_seen) |
||
272 | child_id = goobj_child.id |
||
273 | # mark child as seen |
||
274 | goids_seen.add(child_id) |
||
275 | ##A self.go2obj[child_id] = goobj_child |
||
276 | # Update goids_seen and go2obj with child alt_ids |
||
277 | for goid_altid in goobj_child.alt_ids: |
||
278 | goids_seen.add(goid_altid) |
||
279 | ##A self.go2obj[goid_altid] = goobj_child |
||
280 | # Loop through parents of child object |
||
281 | for parent_obj in goobj_child.parents: |
||
282 | parent_id = parent_obj.id |
||
283 | p2cs[parent_id].add(child_id) |
||
284 | # If parent has not been seen, traverse |
||
285 | if parent_id not in goids_seen: |
||
286 | ##F self._traverse_parent_objs(p2cs, parent_obj, go2obj, goids_seen) |
||
287 | self._traverse_parent_objs(p2cs, parent_obj, goids_seen) |
||
288 | |||
289 | # ------------------------------------------------------------------- |
||
290 | def _init_c2ps(self, go_sources, traverse_child): |
||
291 | """Traverse up children.""" |
||
292 | if not traverse_child: |
||
293 | return {} |
||
294 | c2ps = defaultdict(set) |
||
295 | goids_seen = set() |
||
296 | go2obj = self.go2obj |
||
297 | for goid_src in go_sources: |
||
298 | goobj_src = go2obj[goid_src] |
||
299 | if goid_src not in goids_seen: |
||
300 | ##F self._traverse_child_objs(c2ps, goobj_src, go2obj, goids_seen) |
||
301 | self._traverse_child_objs(c2ps, goobj_src, goids_seen) |
||
302 | return c2ps |
||
303 | |||
304 | ##F def _traverse_child_objs(self, c2ps, goobj_parent, go2obj, goids_seen): |
||
305 | def _traverse_child_objs(self, c2ps, goobj_parent, goids_seen): |
||
306 | """Traverse from source GO down children.""" |
||
307 | # Update public(godag.go2obj godag.c2ps), private(_seen_pids) |
||
308 | parent_id = goobj_parent.id |
||
309 | # mark parent as seen |
||
310 | goids_seen.add(parent_id) |
||
311 | ##A self.go2obj[parent_id] = goobj_parent |
||
312 | # Update goids_seen and go2obj with parent alt_ids |
||
313 | for goid_altid in goobj_parent.alt_ids: |
||
314 | goids_seen.add(goid_altid) |
||
315 | ##A self.go2obj[goid_altid] = goobj_parent |
||
316 | # Loop through children |
||
317 | for child_obj in goobj_parent.children: |
||
318 | child_id = child_obj.id |
||
319 | c2ps[child_id].add(parent_id) |
||
320 | # If child has not been seen, traverse |
||
321 | if child_id not in goids_seen: |
||
322 | ##F self._traverse_child_objs(c2ps, child_obj, go2obj, goids_seen) |
||
323 | self._traverse_child_objs(c2ps, child_obj, goids_seen) |
||
324 | |||
361 |