GoSubDag.prt_goids()   A
last analyzed

Complexity

Conditions 4

Size

Total Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 4
dl 0
loc 11
rs 9.85
c 1
b 0
f 0
1
"""Manages a user-specified subset of a GO DAG."""
2
3
from __future__ import print_function
4
5
__copyright__ = "Copyright (C) 2016-2018, DV Klopfenstein, H Tang, All rights reserved."
6
__author__ = "DV Klopfenstein"
7
8
import sys
9
import collections as cx
10
# import timeit
11
# from goatools.test_data.godag_timed import prt_hms
12
from goatools.gosubdag.gosubdag_init import InitGOs
13
from goatools.gosubdag.gosubdag_init import InitFields
14
from goatools.gosubdag.go_tasks import chk_goids
15
16
17
class GoSubDag(object):
18
    """Manages a user-specified subset of a GO DAG."""
19
20
21
    def __init__(self, go_sources, go2obj, relationships=None, **kws):
22
        # kws _Init: rcntobj relationships
23
        # tic = timeit.default_timer()
24
        _ini = InitGOs(go_sources, go2obj, relationships, **kws)
25
        self.go_sources = _ini.go_sources # set(go_sources)
26
        self.go2obj = _ini.go2obj # go2obj # Initialized with goobjs corresponding to go_sources
27
        self.relationships = _ini.relationships
28
        # tic = prt_hms(tic, "GoSubDag: InitGOs")
29
        # GO IDs to total count of all descendants: Init to None or CountRelatives object
30
        _fld = InitFields(_ini, **kws)
31
        self.rcntobj = _fld.get_rcntobj()  # None or CountRelatives object
32
        self.prt_attr = {
33
            'flds':_fld.prt_flds,           # namedtuple fields in go2nt
34
            'fmt':_fld.get_prt_fmt(False),  # GO:NNNNNNN   No indication if an alternate GO ID
35
            'fmta':_fld.get_prt_fmt(True)}  # GO:NNNNNNNa  'a' indicates if an alternate GO ID
36
        ### tic = _rpt_hms(tic, "GoSubDag: Create GoDepth1Letters")
37
        self.go2nt = _fld.get_go2nt_all(self.rcntobj)
38
        ### tic = _rpt_hms(tic0, "GoSubDag: total")
39
        prt = kws.get('prt', None)
40
        if prt is not None:
41
            self.prt_objdesc(prt)
42
43
    def prt_goids(self, goids=None, prtfmt=None, sortby=True, prt=sys.stdout):
44
        """Given GO IDs, print decriptive info about each GO Term."""
45
        if goids is None:
46
            goids = self.go_sources
47
        nts = self.get_nts(goids, sortby)
48
        if prtfmt is None:
49
            prtfmt = self.prt_attr['fmta']
50
        for ntgo in nts:
51
            key2val = ntgo._asdict()
52
            prt.write("{GO}\n".format(GO=prtfmt.format(**key2val)))
53
        return nts
54
55
    def get_nts(self, goids=None, sortby=None):
56
        """Given GO IDs, get a list of namedtuples."""
57
        nts = []
58
        # User GO IDs
59
        if goids is None:
60
            goids = self.go_sources
61
        else:
62
            chk_goids(goids, "GoSubDag::get_nts")
63
        if goids:
64
            ntobj = cx.namedtuple("NtGo", " ".join(self.prt_attr['flds']))
65
            go2nt = self.get_go2nt(goids)
66
            for goid, ntgo in self._get_sorted_go2nt(go2nt, sortby):
67
                assert ntgo is not None, "{GO} NOT IN go2nt".format(GO=goid)
68
                if goid == ntgo.GO:
69
                    nts.append(ntgo)
70
                else:
71
                    fld2vals = ntgo._asdict()
72
                    fld2vals['GO'] = goid
73
                    nts.append(ntobj(**fld2vals))
74
        return nts
75
76
    def _get_sorted_go2nt(self, go2nt, sortby):
77
        """Return sorted list of tuples."""
78
        if sortby is True:
79
            _fnc = self.get_fncsortnt()
80
            return sorted(go2nt.items(), key=lambda t: _fnc(t[1]))
81
        if sortby:
82
            return sorted(go2nt.items(), key=lambda t: sortby(t[1]))
83
        return go2nt.items()
84
85
    def get_fncsortnt(self):
86
        """Return sorted list of tuples."""
87
        if 'dcnt' in self.prt_attr['flds']:
88
            if 'D1' in self.prt_attr['flds']:
89
                return lambda ntgo: [ntgo.NS, ntgo.depth, -1*ntgo.dcnt, ntgo.D1, ntgo.alt]
90
            else:
91
                return lambda ntgo: [ntgo.NS, ntgo.depth, -1*ntgo.dcnt, ntgo.alt]
92
        else:
93
            return lambda ntgo: [ntgo.NS, -1*ntgo.depth, ntgo.alt]
94
95
    def get_go2nt(self, goids):
96
        """Return dict of GO ID as key and GO object information in namedtuple."""
97
        get_nt = self.go2nt
98
        goids_present = set(goids).intersection(self.go2obj)
99
        if len(goids_present) != len(goids):
100
            print("GO IDs NOT FOUND IN DAG: {GOs}".format(
101
                GOs=" ".join(set(goids).difference(goids_present))))
102
        return {g:get_nt[g] for g in goids_present}
103
104
    def get_go2obj(self, goids):
105
        """Return a go2obj dict for just the user goids."""
106
        go2obj = self.go2obj
107
        return {go:go2obj[go] for go in goids}
108
109
    def get_vals(self, field, goids=None):
110
        """Return a go2obj dict for just the user goids."""
111
        go2nt = self.go2nt
112
        if goids is None:
113
            goids = set(go2nt)
114
        return [getattr(go2nt[go], field) for go in goids]
115
116
    def get_key_goids(self, goids):
117
        """Given GO IDs, return key GO IDs."""
118
        go2obj = self.go2obj
119
        return set(go2obj[go].id for go in goids)
120
121
    def get_ns2goids(self, goids):
122
        """Group GO IDs by namespace."""
123
        ns2goids = cx.defaultdict(set)
124
        go2nt = self.go2nt
125
        for goid in goids:
126
            ns2goids[go2nt[goid].NS].add(goid)
127
        return {ns:gos for ns, gos in ns2goids.items()}
128
129
    def prt_objdesc(self, prt):
130
        """Return description of this GoSubDag object."""
131
        txt = "INITIALIZING GoSubDag: {N:3} sources in {M:3} GOs rcnt({R}). {A} alt GO IDs\n"
132
        alt2obj = {go:o for go, o in self.go2obj.items() if go != o.id}
133
        prt.write(txt.format(
134
            N=len(self.go_sources),
135
            M=len(self.go2obj),
136
            R=self.rcntobj is not None,
137
            A=len(alt2obj)))
138
        prt.write("             GoSubDag: namedtuple fields: {FLDS}\n".format(
139
            FLDS=" ".join(self.prt_attr['flds'])))
140
        prt.write("             GoSubDag: relationships: {RELS}\n".format(RELS=self.relationships))
141
142
143
# Copyright (C) 2016-2018, DV Klopfenstein, H Tang, All rights reserved.
144