EdgesRelatives._traverse_parent_objs()   A
last analyzed

Complexity

Conditions 4

Size

Total Lines 19

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 4
dl 0
loc 19
rs 9.45
c 1
b 0
f 0
1
"""Creates and manages edges from one GO term to another GO term."""
2
3
__copyright__ = "Copyright (C) 2016-2018, DV Klopfenstein, H Tang, All rights reserved."
4
__author__ = "DV Klopfenstein"
5
6
from collections import defaultdict
7
8
9
def get_edgesobj(gosubdag, **kws):
10
    """Return specfied GoSubDag initialization object."""
11
    # Keyword args (kws):
12
    #     1. dst_srcs_list  Used for edges pruned such that only GO terms
13
    #                       are retained which are between the sets of dst & srcs.
14
    #     2  traverse_parent & traverse_child
15
    #                       Used to generate a GoSubDag with all parent terms and/or
16
    #                       all child terms, without pruning any paths.
17
    # Call function, get_edgesobj, with:
18
    #     get_edgesobj(go2obj, dst_srcs_list=...)
19
    # Or any of:
20
    #     get_edgesobj(go2obj, go_sources=...)
21
    #     get_edgesobj(go2obj, go_sources=..., traverse_parent=...,)
22
    #     get_edgesobj(go2obj, go_sources=..., traverse_child=...,)
23
    #     get_edgesobj(go2obj, go_sources=..., traverse_parent=..., traverse_child=...,)
24
    edgeobj = _get_edgesobj(gosubdag, **kws)
25
    rm_gos = kws.get('rm_gos')
26
    if rm_gos is not None:
27
        edgeobj.rm_gos(rm_gos)
28
    return edgeobj
29
30
def _get_edgesobj(gosubdag, **kws):
31
    """Return specfied GoSubDag initialization object."""
32
    # Keyword args (kws):
33
    #     1. dst_srcs_list  Used for edges pruned such that only GO terms
34
    #                       are retained which are between the sets of dst & srcs.
35
    #     2  traverse_parent & traverse_child
36
    #                       Used to generate a GoSubDag with all parent terms and/or
37
    #                       all child terms, without pruning any paths.
38
    # Call function, get_edgesobj, with:
39
    #     get_edgesobj(go2obj, dst_srcs_list=...)
40
    # Or any of:
41
    #     get_edgesobj(go2obj, go_sources=...)
42
    #     get_edgesobj(go2obj, go_sources=..., traverse_parent=...,)
43
    #     get_edgesobj(go2obj, go_sources=..., traverse_child=...,)
44
    #     get_edgesobj(go2obj, go_sources=..., traverse_parent=..., traverse_child=...,)
45
    dst_srcs_list = kws.get('dst_srcs_list', None)
46
    if dst_srcs_list is not None:
47
        return EdgesPath(gosubdag, dst_srcs_list)
48
    return EdgesRelatives(gosubdag,
49
                          kws.get('traverse_parent', True),
50
                          kws.get('traverse_child', False))
51
52
# -- Base Class ----------------------------------------------------------------
53
class EdgesBase(object):
54
    """Base class for GoEdges class."""
55
56
    def __init__(self, gosubdag):
57
        self.gosubdag = gosubdag
58
        self.go2obj = gosubdag.go2obj
59
        self.relationships = gosubdag.relationships
60
        # Set by derived edge class
61
        self.edges = []  # Lists of (goid_child, goid_parent)
62
        self.edges_rel = {}
63
64
    def rm_gos(self, rm_goids):
65
        """Remove any edges that contain user-specified edges."""
66
        self.edges = self._rm_gos_edges(rm_goids, self.edges)
67
        self.edges_rel = self._rm_gos_edges_rel(rm_goids, self.edges_rel)
68
69
    def _rm_gos_edges_rel(self, rm_goids, edges_rel):
70
        """Remove any relationship that contain user-specified edges."""
71
        edges_ret = {}
72
        for rname, edges_cur in edges_rel.items():
73
            edges_new = self._rm_gos_edges(rm_goids, edges_cur)
74
            if edges_new:
75
                edges_ret[rname] = edges_new
76
        return edges_ret
77
78
    @staticmethod
79
    def _rm_gos_edges(rm_goids, edges_all):
80
        """Remove any is_a edges that contain user-specified edges."""
81
        edges_reduced = []
82
        for goid_child, goid_parent in sorted(edges_all, key=lambda t: t[1]):
83
            if goid_child not in rm_goids and goid_parent not in rm_goids:
84
                edges_reduced.append((goid_child, goid_parent))
85
        return edges_reduced
86
87
    def get_all_edge_nodes(self):
88
        """Return a list of all GO IDs that are connected to edges."""
89
        edge_nodes = set(e for es in self.edges for e in es)
90
        for edges in self.edges_rel.values():
91
            rel_nodes = set(e for es in edges for e in es)
92
            edge_nodes.update(rel_nodes)
93
        return edge_nodes
94
95
    def chk_edges(self):
96
        """Check that all edge nodes exist in local subset."""
97
        goids = set(self.go2obj)
98
        self.chk_edges_nodes(self.edges, goids, "is_a")
99
        for reltype, edges in self.edges_rel.items():
100
            self.chk_edges_nodes(edges, goids, reltype)
101
102
    @staticmethod
103
    def chk_edges_nodes(edges, nodes, name):
104
        """Check that user specified edges have a node which exists."""
105
        edge_nodes = set(e for es in edges for e in es)
106
        missing_nodes = edge_nodes.difference(nodes)
107
        assert not missing_nodes, "MISSING: {GOs}\n{NM} EDGES MISSING {N} NODES (OF {T})".format(
108
            NM=name, N=len(missing_nodes), T=len(edge_nodes), GOs=missing_nodes)
109
110
    def get_c2ps(self):
111
        """Set child2parents dict for all parents used in this set of edges."""
112
        c2ps = defaultdict(set)
113
        for goid_child, goid_parent in self.edges:
114
            c2ps[goid_child].add(goid_parent)
115
        return c2ps
116
117
    def _getobjs_higher(self, goobj):
118
        """Get all parents/relationships on this GOTerm."""
119
        goobjs_higher = set(goobj.parents)
120
        for reltyp, relgoobjs in goobj.relationship.items():
121
            if reltyp in self.relationships:
122
                goobjs_higher.update(relgoobjs)
123
        return goobjs_higher
124
125
126
127
# -- Initialization by considering all child and/or parent relatives -----------
128
class EdgesRelatives(EdgesBase):
129
    """Inits GO-to-GO edges using all relatives above and/or below source GOs."""
130
131
    # pylint: disable=too-many-arguments
132
    # def __init__(self, go2obj, relationships, go_sources, traverse_parent, traverse_child):
133
    def __init__(self, gosubdag, traverse_parent, traverse_child):
134
        super(EdgesRelatives, self).__init__(gosubdag)
135
        # go2obj contain GO IDs in subset
136
        _gos = set(gosubdag.go2obj)
137
        assert traverse_child or traverse_parent, "NO EDGES IN GRAPH"
138
        # GO IDs for child->parents
139
        p2cs = self._init_p2cs(_gos, traverse_parent)
140
        # GO IDs for parent->children
141
        c2ps = self._init_c2ps(gosubdag.go_sources, traverse_child)
142
        # GO IDs for GO->relationship
143
        rel2src2dsts = self._init_rel2src2dsts(_gos, traverse_parent)
144
        rel2dst2srcs = self._init_rel2dst2srcs(_gos, traverse_child)
145
        # Set by derived edge class
146
        # self.edges = self._init_edges(_gos, p2cs, c2ps)
147
        self.edges = self._init_edges(p2cs, c2ps)
148
        self.edges_rel = self._init_edges_relationships(rel2src2dsts, rel2dst2srcs)
149
        assert _gos == set(self.go2obj)
150
        # self.chk_edges()
151
152
    @staticmethod
153
    # Too slow to check goids_present as we go. Only minor init modes need checking.
154
    # def _init_edges(goids_present, p2cs, c2ps):
155
    def _init_edges(p2cs, c2ps):
156
        """Get the directed edges from GO term to GO term."""
157
        edge_from_to = []
158
        for parent, children in p2cs.items():
159
            for child in children:
160
                # if child in goids_present and parent in goids_present:
161
                edge_from_to.append((child, parent))
162
        for parent, children in c2ps.items():
163
            for child in children:
164
                # if child in goids_present and parent in goids_present:
165
                edge_from_to.append((child, parent))
166
        return edge_from_to
167
168
    @staticmethod
169
    def _init_edges_relationships(rel2src2dsts, rel2dst2srcs):
170
        """Get the directed edges from GO term to GO term using relationships."""
171
        edge_rel2fromto = {}
172
        relationships = set(rel2src2dsts).union(rel2dst2srcs)
173
        for reltype in relationships:
174
            edge_from_to = []
175
            if reltype in rel2src2dsts:
176
                for parent, children in rel2src2dsts[reltype].items():
177
                    for child in children:
178
                        edge_from_to.append((child, parent))
179
            if reltype in rel2dst2srcs:
180
                for parent, children in rel2dst2srcs[reltype].items():
181
                    for child in children:
182
                        edge_from_to.append((child, parent))
183
            edge_rel2fromto[reltype] = edge_from_to
184
        return edge_rel2fromto
185
186
    # -------------------------------------------------------------------
187
    def _init_rel2src2dsts(self, go_sources, traverse_parent):
188
        """Traverse up parents."""
189
        if not traverse_parent or not self.relationships:
190
            return {}
191
        rel2src2dsts = {r:defaultdict(set) for r in self.relationships}
192
        goids_seen = set()
193
        go2obj = self.go2obj
194
        for goid_src in go_sources:
195
            goobj_src = go2obj[goid_src]
196
            if goobj_src.relationship and goid_src not in goids_seen:
197
                self._traverse_relationship_objs(rel2src2dsts, goobj_src, goids_seen)
198
        return rel2src2dsts
199
200 View Code Duplication
    def _traverse_relationship_objs(self, rel2src2dsts, goobj_child, goids_seen):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
201
        """Traverse from source GO up relationships."""
202
        child_id = goobj_child.id
203
        goids_seen.add(child_id)
204
        ##A self.go2obj[child_id] = goobj_child
205
        # Update goids_seen and go2obj with child alt_ids
206
        for goid_altid in goobj_child.alt_ids:
207
            goids_seen.add(goid_altid)
208
            ##A self.go2obj[goid_altid] = goobj_child
209
        # Loop through relationships of child object
210
        for reltype, recs in goobj_child.relationship.items():
211
            if reltype in self.relationships:
212
                for relationship_obj in recs:
213
                    relationship_id = relationship_obj.id
214
                    rel2src2dsts[reltype][relationship_id].add(child_id)
215
                    # If relationship has not been seen, traverse
216
                    if relationship_id not in goids_seen:
217
                        self._traverse_relationship_objs(rel2src2dsts, relationship_obj, goids_seen)
218
219
    # -------------------------------------------------------------------
220
    def _init_rel2dst2srcs(self, go_sources, traverse_child):
221
        """Traverse through reverse relationships."""
222
        if not traverse_child or not self.relationships:
223
            return {}
224
        rel2dst2srcs = {r:defaultdict(set) for r in self.relationships}
225
        goids_seen = set()
226
        go2obj = self.go2obj
227
        for goid_src in go_sources:
228
            goobj_src = go2obj[goid_src]
229
            if goid_src not in goids_seen:
230
                self._traverse_relationship_rev_objs(rel2dst2srcs, goobj_src, goids_seen)
231
        return rel2dst2srcs
232
233 View Code Duplication
    def _traverse_relationship_rev_objs(self, rel2dst2srcs, goobj_parent, goids_seen):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
234
        """Traverse from source GO down children."""
235
        parent_id = goobj_parent.id
236
        goids_seen.add(parent_id)
237
        ##A self.go2obj[parent_id] = goobj_parent
238
        # Update goids_seen and go2obj with parent alt_ids
239
        for goid_altid in goobj_parent.alt_ids:
240
            goids_seen.add(goid_altid)
241
            ##A self.go2obj[goid_altid] = goobj_parent
242
        # Loop through children
243
        for reltype, recs in goobj_parent.relationship.items():
244
            if reltype in self.relationships:
245
                for relrev_obj in recs:
246
                    relrev_id = relrev_obj.id
247
                    rel2dst2srcs[relrev_id].add(parent_id)
248
                    # If child has not been seen, traverse
249
                    if relrev_id not in goids_seen:
250
                        ##F self._traverse_relrev_objs(rel2dst2srcs, relrev_obj, go2obj, goids_seen)
251
                        self._traverse_relationship_rev_objs(rel2dst2srcs, relrev_obj, goids_seen)
252
253
    # -------------------------------------------------------------------
254
    def _init_p2cs(self, go_sources, traverse_parent):
255
        """Traverse up parents."""
256
        if not traverse_parent:
257
            return {}
258
        p2cs = defaultdict(set)
259
        goids_seen = set()
260
        go2obj = self.go2obj
261
        for goid_src in go_sources:
262
            goobj_src = go2obj[goid_src]
263
            if goid_src not in goids_seen:
264
                ##F self._traverse_parent_objs(p2cs, goobj_src, go2obj, goids_seen)
265
                self._traverse_parent_objs(p2cs, goobj_src, goids_seen)
266
        return p2cs
267
268
    ##F def _traverse_parent_objs(self, p2cs, goobj_child, go2obj, goids_seen):
269
    def _traverse_parent_objs(self, p2cs, goobj_child, goids_seen):
270
        """Traverse from source GO up parents."""
271
        # Update public(go2obj p2cs), private(goids_seen)
272
        child_id = goobj_child.id
273
        # mark child as seen
274
        goids_seen.add(child_id)
275
        ##A self.go2obj[child_id] = goobj_child
276
        # Update goids_seen and go2obj with child alt_ids
277
        for goid_altid in goobj_child.alt_ids:
278
            goids_seen.add(goid_altid)
279
            ##A self.go2obj[goid_altid] = goobj_child
280
        # Loop through parents of child object
281
        for parent_obj in goobj_child.parents:
282
            parent_id = parent_obj.id
283
            p2cs[parent_id].add(child_id)
284
            # If parent has not been seen, traverse
285
            if parent_id not in goids_seen:
286
                ##F self._traverse_parent_objs(p2cs, parent_obj, go2obj, goids_seen)
287
                self._traverse_parent_objs(p2cs, parent_obj, goids_seen)
288
289
    # -------------------------------------------------------------------
290
    def _init_c2ps(self, go_sources, traverse_child):
291
        """Traverse up children."""
292
        if not traverse_child:
293
            return {}
294
        c2ps = defaultdict(set)
295
        goids_seen = set()
296
        go2obj = self.go2obj
297
        for goid_src in go_sources:
298
            goobj_src = go2obj[goid_src]
299
            if goid_src not in goids_seen:
300
                ##F self._traverse_child_objs(c2ps, goobj_src, go2obj, goids_seen)
301
                self._traverse_child_objs(c2ps, goobj_src, goids_seen)
302
        return c2ps
303
304
    ##F def _traverse_child_objs(self, c2ps, goobj_parent, go2obj, goids_seen):
305
    def _traverse_child_objs(self, c2ps, goobj_parent, goids_seen):
306
        """Traverse from source GO down children."""
307
        # Update public(godag.go2obj godag.c2ps), private(_seen_pids)
308
        parent_id = goobj_parent.id
309
        # mark parent as seen
310
        goids_seen.add(parent_id)
311
        ##A self.go2obj[parent_id] = goobj_parent
312
        # Update goids_seen and go2obj with parent alt_ids
313
        for goid_altid in goobj_parent.alt_ids:
314
            goids_seen.add(goid_altid)
315
            ##A self.go2obj[goid_altid] = goobj_parent
316
        # Loop through children
317
        for child_obj in goobj_parent.children:
318
            child_id = child_obj.id
319
            c2ps[child_id].add(parent_id)
320
            # If child has not been seen, traverse
321
            if child_id not in goids_seen:
322
                ##F self._traverse_child_objs(c2ps, child_obj, go2obj, goids_seen)
323
                self._traverse_child_objs(c2ps, child_obj, goids_seen)
324
325
326
# -- Initialization with realtives on specific src-dst paths -------------------
327
class EdgesPath(EdgesBase):
328
    """Inits GO-to-GO edges using a list of (parent destination, child sources)"""
329
330
    def __init__(self, gosubdag, dst_srcs_list):
331
        super(EdgesPath, self).__init__(gosubdag)
332
        self.edges = None
333
        self.goid_all = None
334
        self._init_edges(dst_srcs_list)
335
        # GO IDs for child->parents
336
        # self.p2cs = self._init_p2cs(go_sources, traverse_parent)
337
        # GO IDs for parent->children
338
        # self.c2ps = self._init_c2ps(go_sources, traverse_child)
339
340
    def get_edges(self):
341
        """Get the directed edges from GO term to GO term."""
342
        return self.edges
343
344
    def _init_edges(self, dst_srcs_list):
345
        """Create all GO edges given a list of (dst, srcs)."""
346
        from goatools.gosubdag.go_paths import get_paths_goobjs, paths2edges
347
        edges_all = set()
348
        goid_all = set()
349
        go2obj = self.go2obj
350
        for dst, srcs in dst_srcs_list:
351
            go2obj_srcs = {}
352
            for goid in srcs:
353
                go2obj_srcs[goid] = go2obj[goid]
354
            go_paths, go_all = get_paths_goobjs(go2obj_srcs.values(), go_top=dst, go2obj=go2obj)
355
            edges_all |= paths2edges(go_paths)
356
            goid_all |= go_all
357
        self.edges = [(a.id, b.id) for a, b in edges_all]
358
        self.goid_all = goid_all
359
360
# Copyright (C) 2016-2018, DV Klopfenstein, H Tang, All rights reserved.
361