Completed
Push — master ( 0f596f...821888 )
by
unknown
01:11
created

GoPaths   A

Complexity

Total Complexity 14

Size/Duplication

Total Lines 45
Duplicated Lines 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 45
rs 10
wmc 14

2 Methods

Rating   Name   Duplication   Size   Complexity  
D get_paths_from_to() 0 27 8
A prt_paths() 0 8 3
1
"""Collect GO paths."""
2
3
__copyright__ = "Copyright (C) 2016-2018, DV Klopfenstein, H Tang, All rights reserved."
4
__author__ = "DV Klopfenstein"
5
6
import collections as cx
7
import sys
8
from itertools import tee
9
10
class GoPaths(object):
11
    """Class for helping traverse GO paths."""
12
13
    adjdir = {
14
        None:  lambda go_obj: go_obj.parents + go_obj.children,
15
        True:  lambda go_obj: go_obj.parents,
16
        False: lambda go_obj: go_obj.children}
17
18
    # Adjacent direction can be any of:
19
    def get_paths_from_to(self, goobj_start, goid_end=None, dn0_up1=True):
20
        """Get a list of paths from goobj_start to either top or goid_end."""
21
        paths = []
22
        # Queue of terms to be examined (and storage for their paths)
23
        working_q = cx.deque([[goobj_start]])
24
        # Loop thru GO terms until we have examined all needed GO terms
25
        adjfnc = self.adjdir[dn0_up1]
26
        while working_q:
27
            #print "WORKING QUEUE LEN({})".format(len(working_q))
28
            path_curr = working_q.popleft()
29
            goobj_curr = path_curr[-1]
30
            go_adjlst = adjfnc(goobj_curr)
31
            #print 'END', goid_end, goobj_curr
32
            # If this GO term is the endpoint, Stop. Store path.
33
            if (goid_end is not None and goobj_curr.id == goid_end) or \
34
               (goid_end is None and not go_adjlst):
35
                paths.append(path_curr)
36
            # Else if this GO term is the not the end, add neighbors to path
37
            else:
38
                for go_neighbor in go_adjlst:
39
                    if go_neighbor not in path_curr:
40
                        #print "{}'s NEIGHBOR IS {}".format(goobj_curr.id, go_neighbor.id)
41
                        new_path = path_curr + [go_neighbor]
42
                        #sys.stdout.write(" {}'s {} {}\n".format(goobj_curr, up_dn, go_neighbor))
43
                        working_q.append(new_path)
44
        #self.prt_paths(paths)
45
        return paths
46
47
    @staticmethod
48
    def prt_paths(paths, prt=sys.stdout):
49
        """Print list of paths."""
50
        pat = "PATHES: {GO} L{L:02} D{D:02}\n"
51
        for path in paths:
52
            for go_obj in path:
53
                prt.write(pat.format(GO=go_obj.id, L=go_obj.level, D=go_obj.depth))
54
            prt.write("\n")
55
56
def get_paths_goobjs(go_objs, go_top=None, go2obj=None):
57
    """Given a list of GO objects, return: paths, user GOs as ints, all GO terms paths."""
58
    go_paths = []  # Collect all paths for go_objs
59
    go_all = set() # Collect all GO terms in all paths
60
    pathobj = GoPaths()
61
    for go_obj in go_objs:
62
        #print "?FIND PATHS FOR {}?".format(go_obj.id)
63
        if go_obj.id not in go_all: # GO not yet seen in paths already found
64
            #print "!FIND PATHS FOR {}!".format(go_obj.id)
65
            paths_curr = pathobj.get_paths_from_to(go_obj, go_top, True)
66
            if paths_curr:
67
                for path_goobjs in paths_curr:
68
                    for path_goobj in path_goobjs:
69
                        goid = path_goobj.id
70
                        if goid not in go_all:
71
                            go_all.add(goid)
72
                            go2obj[goid] = path_goobj
73
                # go_all.update(GO.id for path in paths_curr for GO in path)
74
                go_paths.extend(path for path in paths_curr)
75
    return go_paths, go_all
76
77
def paths2edges(paths):
78
    """[8079, 8135, 3723, 3676, 1901363, 5488, 3674] """
79
    edges_all = set()
80
    for path in paths:
81
        for edge in path2edges(path):
82
            edges_all.add(edge)
83
    return edges_all
84
85
def path2edges(path):
86
    """Given: [2000343, 32722, 1819]   Return: set([(2000343, 32722), (32722, 1819)])."""
87
    node_a, node_b = tee(path)
88
    next(node_b, None)
89
    return zip(node_a, node_b)
90
91
# Copyright (C) 2016-2018, DV Klopfenstein, H Tang, All rights reserved.
92