Total Complexity | 69 |
Total Lines | 207 |
Duplicated Lines | 20.29 % |
Changes | 0 |
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like GODagSmallPlot often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | """Plot a GODagSmall.""" |
||
81 | class GODagSmallPlot(object): |
||
82 | """Plot a graph contained in an object of type GODagSmall .""" |
||
83 | |||
84 | def __init__(self, godagsmall, *args, **kws): |
||
85 | self.args = args |
||
86 | self.log = kws['log'] if 'log' in kws else sys.stdout |
||
87 | self.title = kws['title'] if 'title' in kws else None |
||
88 | # GOATOOLs results as objects |
||
89 | self.go2res = self._init_go2res(**kws) |
||
90 | # GOATOOLs results as a list of namedtuples |
||
91 | self.pval_name = self._init_pval_name(**kws) |
||
92 | # Gene Symbol names |
||
93 | self.id2symbol = kws['id2symbol'] if 'id2symbol' in kws else {} |
||
94 | self.study_items = kws['study_items'] if 'study_items' in kws else None |
||
95 | self.study_items_max = self._init_study_items_max() |
||
96 | self.alpha_str = kws['alpha_str'] if 'alpha_str' in kws else None |
||
97 | self.pltvars = kws['GODagPltVars'] if 'GODagPltVars' in kws else GODagPltVars() |
||
98 | if 'items_p_line' in kws: |
||
99 | self.pltvars.items_p_line = kws['items_p_line'] |
||
100 | self.dpi = kws['dpi'] if 'dpi' in kws else 150 |
||
101 | self.godag = godagsmall |
||
102 | self.goid2color = self._init_goid2color() |
||
103 | self.pydot = None |
||
104 | |||
105 | def _init_study_items_max(self): |
||
106 | """User can limit the number of genes printed in a GO term.""" |
||
107 | if self.study_items is None: |
||
108 | return None |
||
109 | if self.study_items is True: |
||
110 | return None |
||
111 | if isinstance(self.study_items, int): |
||
112 | return self.study_items |
||
113 | return None |
||
114 | |||
115 | @staticmethod |
||
116 | def _init_go2res(**kws): |
||
117 | """Initialize GOEA results.""" |
||
118 | if 'goea_results' in kws: |
||
119 | return {res.GO:res for res in kws['goea_results']} |
||
120 | if 'go2nt' in kws: |
||
121 | return kws['go2nt'] |
||
122 | |||
123 | @staticmethod |
||
124 | def _init_pval_name(**kws): |
||
125 | """Initialize pvalue attribute name.""" |
||
126 | if 'pval_name' in kws: |
||
127 | return kws['pval_name'] |
||
128 | if 'goea_results' in kws: |
||
129 | goea = kws['goea_results'] |
||
130 | if goea: |
||
131 | return "p_{M}".format(M=goea[0].method_flds[0].fieldname) |
||
132 | |||
133 | def _init_goid2color(self): |
||
134 | """Set colors of GO terms.""" |
||
135 | goid2color = {} |
||
136 | # 1. colors based on p-value override colors based on source GO |
||
137 | View Code Duplication | if self.go2res is not None: |
|
|
|||
138 | alpha2col = self.pltvars.alpha2col |
||
139 | pval_name = self.pval_name |
||
140 | for goid, res in self.go2res.items(): |
||
141 | pval = getattr(res, pval_name, None) |
||
142 | if pval is not None: |
||
143 | for alpha, color in alpha2col.items(): |
||
144 | if pval <= alpha and res.study_count != 0: |
||
145 | if goid not in goid2color: |
||
146 | goid2color[goid] = color |
||
147 | # 2. GO source color |
||
148 | color = self.pltvars.key2col['go_sources'] |
||
149 | for goid in self.godag.go_sources: |
||
150 | if goid not in goid2color: |
||
151 | goid2color[goid] = color |
||
152 | # 3. Level-01 GO color |
||
153 | color = self.pltvars.key2col['level_01'] |
||
154 | for goid, goobj in self.godag.go2obj.items(): |
||
155 | if goobj.level == 1: |
||
156 | if goid not in goid2color: |
||
157 | goid2color[goid] = color |
||
158 | return goid2color |
||
159 | |||
160 | def plt(self, fout_img, engine="pydot"): |
||
161 | """Plot using pydot, graphviz, or GML.""" |
||
162 | if engine == "pydot": |
||
163 | self._plt_pydot(fout_img) |
||
164 | elif engine == "pygraphviz": |
||
165 | raise Exception("TO BE IMPLEMENTED SOON: ENGINE pygraphvis") |
||
166 | else: |
||
167 | raise Exception("UNKNOWN ENGINE({E})".format(E=engine)) |
||
168 | |||
169 | # ---------------------------------------------------------------------------------- |
||
170 | # pydot |
||
171 | View Code Duplication | def _plt_pydot(self, fout_img): |
|
172 | """Plot using the pydot graphics engine.""" |
||
173 | dag = self._get_pydot_graph() |
||
174 | img_fmt = os.path.splitext(fout_img)[1][1:] |
||
175 | dag.write(fout_img, format=img_fmt) |
||
176 | self.log.write(" {GO_USR:>3} usr {GO_ALL:>3} GOs WROTE: {F}\n".format( |
||
177 | F=fout_img, |
||
178 | GO_USR=len(self.godag.go_sources), |
||
179 | GO_ALL=len(self.godag.go2obj))) |
||
180 | |||
181 | def _get_pydot_graph(self): |
||
182 | """Given a DAG, return a pydot digraph object.""" |
||
183 | rel = "is_a" |
||
184 | pydot = self._get_pydot() |
||
185 | # Initialize empty dag |
||
186 | dag = pydot.Dot(label=self.title, graph_type='digraph', dpi="{}".format(self.dpi)) |
||
187 | # Initialize nodes |
||
188 | go2node = self._get_go2pydotnode() |
||
189 | # Add nodes to graph |
||
190 | for node in go2node.values(): |
||
191 | dag.add_node(node) |
||
192 | # Add edges to graph |
||
193 | rel2col = self.pltvars.rel2col |
||
194 | for src, tgt in self.godag.get_edges(): |
||
195 | dag.add_edge(pydot.Edge( |
||
196 | go2node[tgt], go2node[src], |
||
197 | shape="normal", |
||
198 | color=rel2col[rel], |
||
199 | dir="back")) # invert arrow direction for obo dag convention |
||
200 | return dag |
||
201 | |||
202 | def _get_go2pydotnode(self): |
||
203 | """Create pydot Nodes.""" |
||
204 | go2node = {} |
||
205 | for goid, goobj in self.godag.go2obj.items(): |
||
206 | txt = self._get_node_text(goid, goobj) |
||
207 | fillcolor = self.goid2color.get(goid, "white") |
||
208 | node = self.pydot.Node( |
||
209 | txt, |
||
210 | shape="box", |
||
211 | style="rounded, filled", |
||
212 | fillcolor=fillcolor, |
||
213 | color="mediumseagreen") |
||
214 | go2node[goid] = node |
||
215 | return go2node |
||
216 | |||
217 | def _get_pydot(self): |
||
218 | """Return pydot package. Load pydot, if necessary.""" |
||
219 | if self.pydot: |
||
220 | return self.pydot |
||
221 | self.pydot = __import__("pydot") |
||
222 | return self.pydot |
||
223 | |||
224 | # ---------------------------------------------------------------------------------- |
||
225 | # Methods for text printed inside GO terms |
||
226 | def _get_node_text(self, goid, goobj): |
||
227 | """Return a string to be printed in a GO term box.""" |
||
228 | txt = [] |
||
229 | # Header line: "GO:0036464 L04 D06" |
||
230 | txt.append(self.pltvars.fmthdr.format( |
||
231 | GO=goobj.id.replace("GO:", "GO"), |
||
232 | level=goobj.level, |
||
233 | depth=goobj.depth)) |
||
234 | # GO name line: "cytoplamic ribonucleoprotein" |
||
235 | name = goobj.name.replace(",", "\n") |
||
236 | txt.append(name) |
||
237 | # study info line: "24 genes" |
||
238 | study_txt = self._get_study_txt(goid) |
||
239 | if study_txt is not None: |
||
240 | txt.append(study_txt) |
||
241 | # return text string |
||
242 | return "\n".join(txt) |
||
243 | |||
244 | def _get_study_txt(self, goid): |
||
245 | """Get GO text from GOEA study.""" |
||
246 | if self.go2res is not None: |
||
247 | res = self.go2res.get(goid, None) |
||
248 | if res is not None: |
||
249 | if self.study_items is not None: |
||
250 | return self._get_item_str(res) |
||
251 | else: |
||
252 | return self.pltvars.fmtres.format( |
||
253 | study_count=res.study_count) |
||
254 | |||
255 | View Code Duplication | def _get_item_str(self, res): |
|
256 | """Return genes in any of these formats: |
||
257 | 1. 19264, 17319, 12520, 12043, 74131, 22163, 12575 |
||
258 | 2. Ptprc, Mif, Cd81, Bcl2, Sash3, Tnfrsf4, Cdkn1a |
||
259 | 3. 7: Ptprc, Mif, Cd81, Bcl2, Sash3... |
||
260 | """ |
||
261 | npl = self.pltvars.items_p_line # Number of items Per Line |
||
262 | prt_items = sorted([self.__get_genestr(itemid) for itemid in res.study_items]) |
||
263 | prt_multiline = [prt_items[i:i+npl] for i in range(0, len(prt_items), npl)] |
||
264 | num_items = len(prt_items) |
||
265 | if self.study_items_max is None: |
||
266 | genestr = "\n".join([", ".join(str(e) for e in sublist) for sublist in prt_multiline]) |
||
267 | return "{N}) {GENES}".format(N=num_items, GENES=genestr) |
||
268 | else: |
||
269 | if num_items <= self.study_items_max: |
||
270 | strs = [", ".join(str(e) for e in sublist) for sublist in prt_multiline] |
||
271 | genestr = "\n".join([", ".join(str(e) for e in sublist) for sublist in prt_multiline]) |
||
272 | return genestr |
||
273 | else: |
||
274 | short_list = prt_items[:self.study_items_max] |
||
275 | short_mult = [short_list[i:i+npl] for i in range(0, len(short_list), npl)] |
||
276 | short_str = "\n".join([", ".join(str(e) for e in sublist) for sublist in short_mult]) |
||
277 | return "".join(["{N} genes; ".format(N=num_items), short_str, "..."]) |
||
278 | |||
279 | def __get_genestr(self, itemid): |
||
280 | """Given a geneid, return the string geneid or a gene symbol.""" |
||
281 | if self.id2symbol is not None: |
||
282 | symbol = self.id2symbol.get(itemid, None) |
||
283 | if symbol is not None: |
||
284 | return symbol |
||
285 | if isinstance(itemid, int): |
||
286 | return str(itemid) |
||
287 | return itemid |
||
288 | |||
290 |