| Total Complexity | 57 |
| Total Lines | 196 |
| Duplicated Lines | 18.88 % |
| Changes | 1 | ||
| Bugs | 0 | Features | 0 |
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like EdgesRelatives often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
| 1 | """Creates and manages edges from one GO term to another GO term.""" |
||
| 128 | class EdgesRelatives(EdgesBase): |
||
| 129 | """Inits GO-to-GO edges using all relatives above and/or below source GOs.""" |
||
| 130 | |||
| 131 | # pylint: disable=too-many-arguments |
||
| 132 | # def __init__(self, go2obj, relationships, go_sources, traverse_parent, traverse_child): |
||
| 133 | def __init__(self, gosubdag, traverse_parent, traverse_child): |
||
| 134 | super(EdgesRelatives, self).__init__(gosubdag) |
||
| 135 | # go2obj contain GO IDs in subset |
||
| 136 | _gos = set(gosubdag.go2obj) |
||
| 137 | assert traverse_child or traverse_parent, "NO EDGES IN GRAPH" |
||
| 138 | # GO IDs for child->parents |
||
| 139 | p2cs = self._init_p2cs(_gos, traverse_parent) |
||
| 140 | # GO IDs for parent->children |
||
| 141 | c2ps = self._init_c2ps(gosubdag.go_sources, traverse_child) |
||
| 142 | # GO IDs for GO->relationship |
||
| 143 | rel2src2dsts = self._init_rel2src2dsts(_gos, traverse_parent) |
||
| 144 | rel2dst2srcs = self._init_rel2dst2srcs(_gos, traverse_child) |
||
| 145 | # Set by derived edge class |
||
| 146 | # self.edges = self._init_edges(_gos, p2cs, c2ps) |
||
| 147 | self.edges = self._init_edges(p2cs, c2ps) |
||
| 148 | self.edges_rel = self._init_edges_relationships(rel2src2dsts, rel2dst2srcs) |
||
| 149 | assert _gos == set(self.go2obj) |
||
| 150 | # self.chk_edges() |
||
| 151 | |||
| 152 | @staticmethod |
||
| 153 | # Too slow to check goids_present as we go. Only minor init modes need checking. |
||
| 154 | # def _init_edges(goids_present, p2cs, c2ps): |
||
| 155 | def _init_edges(p2cs, c2ps): |
||
| 156 | """Get the directed edges from GO term to GO term.""" |
||
| 157 | edge_from_to = [] |
||
| 158 | for parent, children in p2cs.items(): |
||
| 159 | for child in children: |
||
| 160 | # if child in goids_present and parent in goids_present: |
||
| 161 | edge_from_to.append((child, parent)) |
||
| 162 | for parent, children in c2ps.items(): |
||
| 163 | for child in children: |
||
| 164 | # if child in goids_present and parent in goids_present: |
||
| 165 | edge_from_to.append((child, parent)) |
||
| 166 | return edge_from_to |
||
| 167 | |||
| 168 | @staticmethod |
||
| 169 | def _init_edges_relationships(rel2src2dsts, rel2dst2srcs): |
||
| 170 | """Get the directed edges from GO term to GO term using relationships.""" |
||
| 171 | edge_rel2fromto = {} |
||
| 172 | relationships = set(rel2src2dsts).union(rel2dst2srcs) |
||
| 173 | for reltype in relationships: |
||
| 174 | edge_from_to = [] |
||
| 175 | if reltype in rel2src2dsts: |
||
| 176 | for parent, children in rel2src2dsts[reltype].items(): |
||
| 177 | for child in children: |
||
| 178 | edge_from_to.append((child, parent)) |
||
| 179 | if reltype in rel2dst2srcs: |
||
| 180 | for parent, children in rel2dst2srcs[reltype].items(): |
||
| 181 | for child in children: |
||
| 182 | edge_from_to.append((child, parent)) |
||
| 183 | edge_rel2fromto[reltype] = edge_from_to |
||
| 184 | return edge_rel2fromto |
||
| 185 | |||
| 186 | # ------------------------------------------------------------------- |
||
| 187 | def _init_rel2src2dsts(self, go_sources, traverse_parent): |
||
| 188 | """Traverse up parents.""" |
||
| 189 | if not traverse_parent or not self.relationships: |
||
| 190 | return {} |
||
| 191 | rel2src2dsts = {r:defaultdict(set) for r in self.relationships} |
||
| 192 | goids_seen = set() |
||
| 193 | go2obj = self.go2obj |
||
| 194 | for goid_src in go_sources: |
||
| 195 | goobj_src = go2obj[goid_src] |
||
| 196 | if goobj_src.relationship and goid_src not in goids_seen: |
||
| 197 | self._traverse_relationship_objs(rel2src2dsts, goobj_src, goids_seen) |
||
| 198 | return rel2src2dsts |
||
| 199 | |||
| 200 | View Code Duplication | def _traverse_relationship_objs(self, rel2src2dsts, goobj_child, goids_seen): |
|
|
|
|||
| 201 | """Traverse from source GO up relationships.""" |
||
| 202 | child_id = goobj_child.id |
||
| 203 | goids_seen.add(child_id) |
||
| 204 | ##A self.go2obj[child_id] = goobj_child |
||
| 205 | # Update goids_seen and go2obj with child alt_ids |
||
| 206 | for goid_altid in goobj_child.alt_ids: |
||
| 207 | goids_seen.add(goid_altid) |
||
| 208 | ##A self.go2obj[goid_altid] = goobj_child |
||
| 209 | # Loop through relationships of child object |
||
| 210 | for reltype, recs in goobj_child.relationship.items(): |
||
| 211 | if reltype in self.relationships: |
||
| 212 | for relationship_obj in recs: |
||
| 213 | relationship_id = relationship_obj.id |
||
| 214 | rel2src2dsts[reltype][relationship_id].add(child_id) |
||
| 215 | # If relationship has not been seen, traverse |
||
| 216 | if relationship_id not in goids_seen: |
||
| 217 | self._traverse_relationship_objs(rel2src2dsts, relationship_obj, goids_seen) |
||
| 218 | |||
| 219 | # ------------------------------------------------------------------- |
||
| 220 | def _init_rel2dst2srcs(self, go_sources, traverse_child): |
||
| 221 | """Traverse through reverse relationships.""" |
||
| 222 | if not traverse_child or not self.relationships: |
||
| 223 | return {} |
||
| 224 | rel2dst2srcs = {r:defaultdict(set) for r in self.relationships} |
||
| 225 | goids_seen = set() |
||
| 226 | go2obj = self.go2obj |
||
| 227 | for goid_src in go_sources: |
||
| 228 | goobj_src = go2obj[goid_src] |
||
| 229 | if goid_src not in goids_seen: |
||
| 230 | self._traverse_relationship_rev_objs(rel2dst2srcs, goobj_src, goids_seen) |
||
| 231 | return rel2dst2srcs |
||
| 232 | |||
| 233 | View Code Duplication | def _traverse_relationship_rev_objs(self, rel2dst2srcs, goobj_parent, goids_seen): |
|
| 234 | """Traverse from source GO down children.""" |
||
| 235 | parent_id = goobj_parent.id |
||
| 236 | goids_seen.add(parent_id) |
||
| 237 | ##A self.go2obj[parent_id] = goobj_parent |
||
| 238 | # Update goids_seen and go2obj with parent alt_ids |
||
| 239 | for goid_altid in goobj_parent.alt_ids: |
||
| 240 | goids_seen.add(goid_altid) |
||
| 241 | ##A self.go2obj[goid_altid] = goobj_parent |
||
| 242 | # Loop through children |
||
| 243 | for reltype, recs in goobj_parent.relationship.items(): |
||
| 244 | if reltype in self.relationships: |
||
| 245 | for relrev_obj in recs: |
||
| 246 | relrev_id = relrev_obj.id |
||
| 247 | rel2dst2srcs[relrev_id].add(parent_id) |
||
| 248 | # If child has not been seen, traverse |
||
| 249 | if relrev_id not in goids_seen: |
||
| 250 | ##F self._traverse_relrev_objs(rel2dst2srcs, relrev_obj, go2obj, goids_seen) |
||
| 251 | self._traverse_relationship_rev_objs(rel2dst2srcs, relrev_obj, goids_seen) |
||
| 252 | |||
| 253 | # ------------------------------------------------------------------- |
||
| 254 | def _init_p2cs(self, go_sources, traverse_parent): |
||
| 255 | """Traverse up parents.""" |
||
| 256 | if not traverse_parent: |
||
| 257 | return {} |
||
| 258 | p2cs = defaultdict(set) |
||
| 259 | goids_seen = set() |
||
| 260 | go2obj = self.go2obj |
||
| 261 | for goid_src in go_sources: |
||
| 262 | goobj_src = go2obj[goid_src] |
||
| 263 | if goid_src not in goids_seen: |
||
| 264 | ##F self._traverse_parent_objs(p2cs, goobj_src, go2obj, goids_seen) |
||
| 265 | self._traverse_parent_objs(p2cs, goobj_src, goids_seen) |
||
| 266 | return p2cs |
||
| 267 | |||
| 268 | ##F def _traverse_parent_objs(self, p2cs, goobj_child, go2obj, goids_seen): |
||
| 269 | def _traverse_parent_objs(self, p2cs, goobj_child, goids_seen): |
||
| 270 | """Traverse from source GO up parents.""" |
||
| 271 | # Update public(go2obj p2cs), private(goids_seen) |
||
| 272 | child_id = goobj_child.id |
||
| 273 | # mark child as seen |
||
| 274 | goids_seen.add(child_id) |
||
| 275 | ##A self.go2obj[child_id] = goobj_child |
||
| 276 | # Update goids_seen and go2obj with child alt_ids |
||
| 277 | for goid_altid in goobj_child.alt_ids: |
||
| 278 | goids_seen.add(goid_altid) |
||
| 279 | ##A self.go2obj[goid_altid] = goobj_child |
||
| 280 | # Loop through parents of child object |
||
| 281 | for parent_obj in goobj_child.parents: |
||
| 282 | parent_id = parent_obj.id |
||
| 283 | p2cs[parent_id].add(child_id) |
||
| 284 | # If parent has not been seen, traverse |
||
| 285 | if parent_id not in goids_seen: |
||
| 286 | ##F self._traverse_parent_objs(p2cs, parent_obj, go2obj, goids_seen) |
||
| 287 | self._traverse_parent_objs(p2cs, parent_obj, goids_seen) |
||
| 288 | |||
| 289 | # ------------------------------------------------------------------- |
||
| 290 | def _init_c2ps(self, go_sources, traverse_child): |
||
| 291 | """Traverse up children.""" |
||
| 292 | if not traverse_child: |
||
| 293 | return {} |
||
| 294 | c2ps = defaultdict(set) |
||
| 295 | goids_seen = set() |
||
| 296 | go2obj = self.go2obj |
||
| 297 | for goid_src in go_sources: |
||
| 298 | goobj_src = go2obj[goid_src] |
||
| 299 | if goid_src not in goids_seen: |
||
| 300 | ##F self._traverse_child_objs(c2ps, goobj_src, go2obj, goids_seen) |
||
| 301 | self._traverse_child_objs(c2ps, goobj_src, goids_seen) |
||
| 302 | return c2ps |
||
| 303 | |||
| 304 | ##F def _traverse_child_objs(self, c2ps, goobj_parent, go2obj, goids_seen): |
||
| 305 | def _traverse_child_objs(self, c2ps, goobj_parent, goids_seen): |
||
| 306 | """Traverse from source GO down children.""" |
||
| 307 | # Update public(godag.go2obj godag.c2ps), private(_seen_pids) |
||
| 308 | parent_id = goobj_parent.id |
||
| 309 | # mark parent as seen |
||
| 310 | goids_seen.add(parent_id) |
||
| 311 | ##A self.go2obj[parent_id] = goobj_parent |
||
| 312 | # Update goids_seen and go2obj with parent alt_ids |
||
| 313 | for goid_altid in goobj_parent.alt_ids: |
||
| 314 | goids_seen.add(goid_altid) |
||
| 315 | ##A self.go2obj[goid_altid] = goobj_parent |
||
| 316 | # Loop through children |
||
| 317 | for child_obj in goobj_parent.children: |
||
| 318 | child_id = child_obj.id |
||
| 319 | c2ps[child_id].add(parent_id) |
||
| 320 | # If child has not been seen, traverse |
||
| 321 | if child_id not in goids_seen: |
||
| 322 | ##F self._traverse_child_objs(c2ps, child_obj, go2obj, goids_seen) |
||
| 323 | self._traverse_child_objs(c2ps, child_obj, goids_seen) |
||
| 324 | |||
| 361 |