1
|
|
|
"""Collect GO paths.""" |
2
|
|
|
|
3
|
|
|
__copyright__ = "Copyright (C) 2016-2018, DV Klopfenstein, H Tang, All rights reserved." |
4
|
|
|
__author__ = "DV Klopfenstein" |
5
|
|
|
|
6
|
|
|
import collections as cx |
7
|
|
|
import sys |
8
|
|
|
from itertools import tee |
9
|
|
|
|
10
|
|
|
class GoPaths(object): |
11
|
|
|
"""Class for helping traverse GO paths.""" |
12
|
|
|
|
13
|
|
|
adjdir = { |
14
|
|
|
None: lambda go_obj: go_obj.parents + go_obj.children, |
15
|
|
|
True: lambda go_obj: go_obj.parents, |
16
|
|
|
False: lambda go_obj: go_obj.children} |
17
|
|
|
|
18
|
|
|
# Adjacent direction can be any of: |
19
|
|
|
def get_paths_from_to(self, goobj_start, goid_end=None, dn0_up1=True): |
20
|
|
|
"""Get a list of paths from goobj_start to either top or goid_end.""" |
21
|
|
|
paths = [] |
22
|
|
|
# Queue of terms to be examined (and storage for their paths) |
23
|
|
|
working_q = cx.deque([[goobj_start]]) |
24
|
|
|
# Loop thru GO terms until we have examined all needed GO terms |
25
|
|
|
adjfnc = self.adjdir[dn0_up1] |
26
|
|
|
while working_q: |
27
|
|
|
#print "WORKING QUEUE LEN({})".format(len(working_q)) |
28
|
|
|
path_curr = working_q.popleft() |
29
|
|
|
goobj_curr = path_curr[-1] |
30
|
|
|
go_adjlst = adjfnc(goobj_curr) |
31
|
|
|
#print 'END', goid_end, goobj_curr |
32
|
|
|
# If this GO term is the endpoint, Stop. Store path. |
33
|
|
|
if (goid_end is not None and goobj_curr.id == goid_end) or \ |
34
|
|
|
(goid_end is None and not go_adjlst): |
35
|
|
|
paths.append(path_curr) |
36
|
|
|
# Else if this GO term is the not the end, add neighbors to path |
37
|
|
|
else: |
38
|
|
|
for go_neighbor in go_adjlst: |
39
|
|
|
if go_neighbor not in path_curr: |
40
|
|
|
#print "{}'s NEIGHBOR IS {}".format(goobj_curr.id, go_neighbor.id) |
41
|
|
|
new_path = path_curr + [go_neighbor] |
42
|
|
|
#sys.stdout.write(" {}'s {} {}\n".format(goobj_curr, up_dn, go_neighbor)) |
43
|
|
|
working_q.append(new_path) |
44
|
|
|
#self.prt_paths(paths) |
45
|
|
|
return paths |
46
|
|
|
|
47
|
|
|
@staticmethod |
48
|
|
|
def prt_paths(paths, prt=sys.stdout): |
49
|
|
|
"""Print list of paths.""" |
50
|
|
|
pat = "PATHES: {GO} L{L:02} D{D:02}\n" |
51
|
|
|
for path in paths: |
52
|
|
|
for go_obj in path: |
53
|
|
|
prt.write(pat.format(GO=go_obj.id, L=go_obj.level, D=go_obj.depth)) |
54
|
|
|
prt.write("\n") |
55
|
|
|
|
56
|
|
|
def get_paths_goobjs(go_objs, go_top=None, go2obj=None): |
57
|
|
|
"""Given a list of GO objects, return: paths, user GOs as ints, all GO terms paths.""" |
58
|
|
|
go_paths = [] # Collect all paths for go_objs |
59
|
|
|
go_all = set() # Collect all GO terms in all paths |
60
|
|
|
pathobj = GoPaths() |
61
|
|
|
for go_obj in go_objs: |
62
|
|
|
#print "?FIND PATHS FOR {}?".format(go_obj.id) |
63
|
|
|
if go_obj.id not in go_all: # GO not yet seen in paths already found |
64
|
|
|
#print "!FIND PATHS FOR {}!".format(go_obj.id) |
65
|
|
|
paths_curr = pathobj.get_paths_from_to(go_obj, go_top, True) |
66
|
|
|
if paths_curr: |
67
|
|
|
for path_goobjs in paths_curr: |
68
|
|
|
for path_goobj in path_goobjs: |
69
|
|
|
goid = path_goobj.id |
70
|
|
|
if goid not in go_all: |
71
|
|
|
go_all.add(goid) |
72
|
|
|
go2obj[goid] = path_goobj |
73
|
|
|
# go_all.update(GO.id for path in paths_curr for GO in path) |
74
|
|
|
go_paths.extend(path for path in paths_curr) |
75
|
|
|
return go_paths, go_all |
76
|
|
|
|
77
|
|
|
def paths2edges(paths): |
78
|
|
|
"""[8079, 8135, 3723, 3676, 1901363, 5488, 3674] """ |
79
|
|
|
edges_all = set() |
80
|
|
|
for path in paths: |
81
|
|
|
for edge in path2edges(path): |
82
|
|
|
edges_all.add(edge) |
83
|
|
|
return edges_all |
84
|
|
|
|
85
|
|
|
def path2edges(path): |
86
|
|
|
"""Given: [2000343, 32722, 1819] Return: set([(2000343, 32722), (32722, 1819)]).""" |
87
|
|
|
node_a, node_b = tee(path) |
88
|
|
|
next(node_b, None) |
89
|
|
|
return zip(node_a, node_b) |
90
|
|
|
|
91
|
|
|
# Copyright (C) 2016-2018, DV Klopfenstein, H Tang, All rights reserved. |
92
|
|
|
|