Total Complexity | 74 |
Total Lines | 412 |
Duplicated Lines | 37.62 % |
Changes | 0 |
Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.
Common duplication problems, and corresponding solutions are:
Complex classes like triplexibility often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.
Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.
1 | #!/usr/bin/env python |
||
2 | # -*- coding: utf-8 -*- |
||
3 | |||
4 | """ |
||
5 | Examples:: |
||
6 | |||
7 | triplexibility.py -t 2nd_triplex_FB_ABC.pdb triples-all-v2-rpr/*.pdb --way backbone+sugar --save --suffix ABC --result abc.csv |
||
8 | |||
9 | triplexibility.py -t 2nd_triplex_FB_1AUA3_rpr.pdb test_data/triples2/Triple_cWW_tSH_GCA_exemplar_rpr_ren.pdb --way backbone+sugar --save --column-name 'rmsd' --triple-mode > out.txt |
||
10 | |||
11 | """ |
||
12 | from __future__ import print_function |
||
13 | |||
14 | import Bio.PDB.PDBParser |
||
15 | import Bio.PDB.Superimposer |
||
16 | import copy |
||
17 | import warnings |
||
18 | warnings.filterwarnings('ignore', '.*Invalid or missing.*',) |
||
19 | warnings.filterwarnings('ignore', '.*with given element *',) |
||
20 | warnings.filterwarnings('ignore', '.*is discontinuous*',) |
||
21 | warnings.filterwarnings('ignore', '.*Ignoring unrecognized record*',) |
||
22 | |||
23 | import sys |
||
24 | from icecream import ic |
||
25 | ic.configureOutput(outputFunction=lambda *a: print(*a, file=sys.stderr)) |
||
26 | ic.configureOutput(prefix='> ') |
||
27 | |||
28 | from rna_tools.rna_tools_lib import set_chain_for_struc, RNAStructure |
||
29 | from rna_tools.rna_tools_config import RT |
||
30 | |||
31 | import argparse |
||
32 | import sys |
||
33 | import glob |
||
34 | import re |
||
35 | import os |
||
36 | |||
37 | class RNAmodel: |
||
38 | def __init__(self, fpath): |
||
39 | # parser 1-5 -> 1 2 3 4 5 |
||
40 | self.struc = Bio.PDB.PDBParser().get_structure('', fpath) |
||
41 | self.__get_atoms() |
||
42 | self.fpath = fpath |
||
43 | self.fn = os.path.basename(fpath) |
||
44 | # self.atoms = [] |
||
45 | # if save: |
||
46 | # self.save() # @save |
||
47 | |||
48 | def __get_atoms(self): |
||
49 | self.atoms=[] |
||
50 | # [<Atom P>, <Atom C5'>, <Atom O5'>, <Atom C4'>, <Atom O4'>, <Atom C3'>, <Atom O3'>, <Atom C2'>, <Atom O2'>, <Atom C1'>, <Atom N1>, <Atom C2>, <Atom N3>, <Atom C4>, <Atom C5>, <Atom C6>, <Atom N6>, <Atom N7>, <Atom C8>, <Atom N9>, <Atom OP1>, <Atom OP2>] |
||
51 | for res in self.struc.get_residues(): |
||
52 | for at in res: |
||
53 | #if args.debug: print(res.resname, res.get_id, at) |
||
54 | self.atoms.append(at) |
||
55 | return self.atoms |
||
56 | |||
57 | def __str__(self): |
||
58 | return self.fn #+ ' # beads' + str(len(self.residues)) |
||
59 | |||
60 | def __repr__(self): |
||
61 | return self.fn #+ ' # beads' + str(len(self.residues)) |
||
62 | |||
63 | def get_report(self): |
||
64 | """Str a short report about rna model""" |
||
65 | t = ' '.join(['File: ', self.fn, ' # of atoms:', str(len(self.atoms)), '\n']) |
||
66 | for r,a in zip(self.residues, self.atoms ): |
||
67 | t += ' '.join(['resi: ', str(r) ,' atom: ', str(a) , '\n' ]) |
||
68 | return t |
||
69 | |||
70 | def get_rmsd_to(self, other_rnamodel, way="", triple_mode=False, save=True, tseq=''): |
||
71 | """Calc rmsd P-atom based rmsd to other rna model |
||
72 | |||
73 | sugar now 10 atoms ;-) """ |
||
74 | sup = Bio.PDB.Superimposer() |
||
75 | if way in ['c1', 'backbone+sugar', 'sugar']: |
||
76 | if way == 'c1': |
||
77 | atomslist = ["C1'"]# ,"C2'","O4'"] #, "C2'"] |
||
78 | |||
79 | elif way == 'sugar': |
||
80 | atomslist = "C5',O5',C4',O4',C3',O3',C2',O2',C1'".split(',') |
||
81 | |||
82 | elif way == 'backbone+sugar': |
||
83 | atomslist = "P,OP1,OP2,C5',O5',C4',O4',C3',O3',C2',O2',C1'".split(',') |
||
84 | |||
85 | self.atoms_for_rmsd = [] |
||
86 | for a in self.atoms: |
||
87 | nt = a.get_parent().get_resname().strip() |
||
88 | if nt in ['G', 'A']: |
||
89 | atomslistx = atomslist + ['N9'] |
||
|
|||
90 | if nt in ['C', 'U']: |
||
91 | atomslistx = atomslist + ['N1'] |
||
92 | if a.name in atomslistx: |
||
93 | self.atoms_for_rmsd.append(a) |
||
94 | if args.debug: print('atoms_for_rmsd', len(self.atoms_for_rmsd)) |
||
95 | |||
96 | other_atoms_for_rmsd = [] |
||
97 | for a in other_rnamodel.atoms: |
||
98 | nt = a.get_parent().get_resname().strip() |
||
99 | if nt in ['G', 'A']: |
||
100 | atomslistx = atomslist + ['N9'] |
||
101 | if nt in ['C', 'U']: |
||
102 | atomslistx = atomslist + ['N1'] |
||
103 | if a.name in atomslistx: |
||
104 | other_atoms_for_rmsd.append(a) |
||
105 | |||
106 | if args.debug: print('other_atoms_for_rmsd', len(other_atoms_for_rmsd)) |
||
107 | |||
108 | elif way == 'c1+Nx': |
||
109 | self.atoms_for_rmsd = [] |
||
110 | for a in self.atoms: |
||
111 | nt = a.get_parent().get_resname().strip() # G |
||
112 | if nt in ['G', 'A']: |
||
113 | atomslist = ["C1'", 'N9'] # , 'N1'] |
||
114 | if nt in ['C', 'U']: |
||
115 | atomslist = ["C1'", 'N1'] # , 'N1'] |
||
116 | if a.name in atomslist: |
||
117 | self.atoms_for_rmsd.append(a) |
||
118 | |||
119 | if args.debug: print('atoms_for_rmsd', len(self.atoms_for_rmsd)) |
||
120 | |||
121 | other_atoms_for_rmsd = [] |
||
122 | for a in other_rnamodel.atoms: |
||
123 | nt = a.get_parent().get_resname().strip() # G |
||
124 | if nt in ['G', 'A']: |
||
125 | atomslist = ["C1'", 'N9'] # , 'N1'] |
||
126 | if nt in ['C', 'U']: |
||
127 | atomslist = ["C1'", 'N1'] # , 'N1'] |
||
128 | if a.name in atomslist: |
||
129 | other_atoms_for_rmsd.append(a) |
||
130 | if args.debug: print('other_atoms_for_rmsd', len(other_atoms_for_rmsd)) |
||
131 | |||
132 | else: |
||
133 | self.atoms_for_rmsd = self.atoms |
||
134 | other_atoms_for_rmsd = other_rnamodel.atoms |
||
135 | |||
136 | # calc rmsd # |
||
137 | if not tseq: |
||
138 | tseq = '' |
||
139 | rt = None |
||
140 | for a in self.atoms_for_rmsd: |
||
141 | r = a.get_parent() |
||
142 | if r != rt: |
||
143 | rt = r |
||
144 | tseq += r.get_resname().strip() |
||
145 | |||
146 | if triple_mode: |
||
147 | def chunks(lst, n): |
||
148 | """Yield successive n-sized chunks from lst. |
||
149 | https://stackoverflow.com/questions/312443/how-do-you-split-a-list-into-evenly-sized-chunks |
||
150 | """ |
||
151 | for i in range(0, len(lst), n): |
||
152 | yield lst[i:i + n] |
||
153 | |||
154 | rmsd_min = 10000 # ugly |
||
155 | import itertools |
||
156 | # ok, for different residues now it's a problem |
||
157 | per = list(itertools.permutations([0, 1, 2])) # [(0, 1, 2), (0, 2, 1), (1, 0, 2), (1, 2, 0), (2, 0, 1), (2, 1, 0)] |
||
158 | lst = list(chunks(other_atoms_for_rmsd, |
||
159 | int(len(other_atoms_for_rmsd)/3))) # for 1 atom, this will be 1 x 3 [3 residues] |
||
160 | # so so len is 3 atoms so / by 3 to get how many atoms per residue |
||
161 | print(lst) |
||
162 | |||
163 | sup_min = None |
||
164 | seq_min = 'not yet obtained, rmsd rejected!' |
||
165 | p_min = None |
||
166 | |||
167 | rms = -1 |
||
168 | |||
169 | for p in per: |
||
170 | patoms = [] |
||
171 | for i in p: # p=(1, 2, 3) |
||
172 | patoms.extend(lst[i]) |
||
173 | |||
174 | #print(self.atoms_for_rmsd) |
||
175 | ## print('patoms') |
||
176 | ## for a in patoms: |
||
177 | ## print(a, a.get_parent().get_id()) |
||
178 | ## print('self.atoms_for_rmsd') |
||
179 | ## for a in self.atoms_for_rmsd: |
||
180 | ## print(a, a.get_parent().get_id()) |
||
181 | #sup.set_atoms(patoms, self.atoms_for_rmsd) |
||
182 | |||
183 | rt = None |
||
184 | seq = '' |
||
185 | for a in patoms: |
||
186 | r = a.get_parent() |
||
187 | if r != rt: |
||
188 | rt = r |
||
189 | seq += r.get_resname().strip() |
||
190 | |||
191 | ic(tseq.lower(), seq.lower(), other_rnamodel.fpath) |
||
192 | # dont' even calc rmsd if the curr seq and tseq are not the same |
||
193 | if tseq.lower() == seq.lower(): # only if seq is the same |
||
194 | print(self.atoms_for_rmsd) |
||
195 | print(patoms) |
||
196 | print(len(self.atoms_for_rmsd), len(patoms)) |
||
197 | for a, b in zip(self.atoms_for_rmsd, patoms): |
||
198 | print(a, a.get_parent().id, a.get_parent().get_resname(), |
||
199 | b.get_parent().id, b.get_parent().get_resname()) |
||
200 | print(len(self.atoms_for_rmsd), len(patoms)) |
||
201 | try: |
||
202 | sup.set_atoms(self.atoms_for_rmsd, patoms) |
||
203 | except: |
||
204 | pass |
||
205 | rms = round(sup.rms, 2) |
||
206 | if rms < rmsd_min: |
||
207 | rmsd_min = rms |
||
208 | sup_min = copy.copy(sup) |
||
209 | suffix = seq |
||
210 | p_min = p |
||
211 | seq_min = seq |
||
212 | if args.debug: 'set new rmsd_min', rmsd_min |
||
213 | |||
214 | if args.debug: |
||
215 | print(p, '', [i + 1 for i in p], end=' ') |
||
216 | print(seq, 'seq_min: ' + seq_min, rms) |
||
217 | |||
218 | View Code Duplication | if p_min: # found target sequence |
|
219 | # what is this? ;-) |
||
220 | index = [0 ,0 ,0] |
||
221 | index[0] = p_min.index(0) |
||
222 | index[1] = p_min.index(1) |
||
223 | index[2] = p_min.index(2) |
||
224 | |||
225 | # ugly re-set 123 to crazy id ! + 100, so this will |
||
226 | # fill up 1 2 3 for the second for |
||
227 | rs = other_rnamodel.struc[0].get_residues() |
||
228 | for i, r in enumerate(rs): |
||
229 | r.id = (' ', index[i] + 253, ' ') # ugly, some random offset |
||
230 | |||
231 | for i, r in enumerate(other_rnamodel.struc[0].get_residues()): |
||
232 | r.id = (' ', index[i] + 1, ' ') |
||
233 | if args.debug: print('r', r) |
||
234 | |||
235 | io = Bio.PDB.PDBIO() |
||
236 | sup_min.apply(other_rnamodel.struc.get_atoms()) |
||
237 | # if args.debug: print(p_min, [i + 1 for i in p_min]) |
||
238 | io.set_structure(other_rnamodel.struc) |
||
239 | |||
240 | args.save_here = True |
||
241 | if args.save_here and save: |
||
242 | folder = os.path.basename(self.fpath.replace('.pdb', '_' + args.folder_prefix + '_aligned')) |
||
243 | # print(f) |
||
244 | try: |
||
245 | os.mkdir(folder) |
||
246 | except: |
||
247 | pass |
||
248 | fout = folder + os.sep + "{:1.2f}".format(rmsd_min) + '-' + os.path.basename(other_rnamodel.fpath)#.replace('.pdb', '-' + str(rms) + '.pdb')) |
||
249 | #_s' + suffix + '.pdb')) |
||
250 | else: |
||
251 | fout = other_rnamodel.fpath.replace('.pdb', '_aligned.pdb')#_s' + suffix + '.pdb') |
||
252 | |||
253 | if args.debug: print(fout) |
||
254 | |||
255 | if save: |
||
256 | io.save(fout) |
||
257 | # ugly set chain to A |
||
258 | set_chain_for_struc(fout, 'A', save_file_inplace=True) |
||
259 | # and now run this to sort into 1 2 3 |
||
260 | |||
261 | r = RNAStructure(fout) |
||
262 | remarks = r.get_remarks_text() |
||
263 | r1 = r.get_res_text('A', 1) |
||
264 | r2 = r.get_res_text('A', 2) |
||
265 | r3 = r.get_res_text('A', 3) |
||
266 | with open(fout, 'w') as f: |
||
267 | f.write(remarks) |
||
268 | f.write(r1) |
||
269 | f.write(r2) |
||
270 | f.write(r3) |
||
271 | r.reload() |
||
272 | r.get_rnapuzzle_ready() |
||
273 | if rmsd_min < 1: # !!!!!!!!!!!! ugly |
||
274 | r.write() |
||
275 | return str(rmsd_min)# + ',s' + seq_min + ',' + os.path.basename(fout) |
||
276 | else: |
||
277 | # check if number of the same atoms # |
||
278 | # if not the same then return -1 # |
||
279 | # print(len(self.atoms_for_rmsd), len(other_atoms_for_rmsd)) |
||
280 | if len(self.atoms_for_rmsd) != len(other_atoms_for_rmsd): |
||
281 | return -1 |
||
282 | sup.set_atoms(self.atoms_for_rmsd, other_atoms_for_rmsd) |
||
283 | rms = round(sup.rms, 2) |
||
284 | |||
285 | if save: |
||
286 | ## io = Bio.PDB.PDBIO() |
||
287 | ## sup.apply(self.struc.get_atoms()) |
||
288 | ## io.set_structure(self.struc) |
||
289 | ## io.save("aligned.pdb") |
||
290 | |||
291 | io = Bio.PDB.PDBIO() |
||
292 | sup.apply(other_rnamodel.struc.get_atoms()) |
||
293 | io.set_structure(other_rnamodel.struc) |
||
294 | |||
295 | args.save_here = True |
||
296 | View Code Duplication | if args.save_here and save: |
|
297 | f = os.path.basename(self.fpath.replace('.pdb', '_aligned')) |
||
298 | # print(f) |
||
299 | try: |
||
300 | os.mkdir(f) |
||
301 | except: |
||
302 | pass |
||
303 | # fout = f + os.sep + os.path.basename(other_rnamodel.fpath.replace('.pdb', '_aligned_s' + suffix + '.pdb')) |
||
304 | fout = f + os.sep + os.path.basename(other_rnamodel.fpath)#.replace('.pdb', '_aligned')) |
||
305 | else: |
||
306 | fout = other_rnamodel.fpath.replace('.pdb', '_aligned.pdb') |
||
307 | if save: |
||
308 | io.save(fout) |
||
309 | return rms |
||
310 | |||
311 | |||
312 | View Code Duplication | def get_rna_models_from_dir(directory): |
|
313 | models = [] |
||
314 | if not os.path.exists(directory): |
||
315 | raise Exception('Dir does not exist! ', directory) |
||
316 | files = glob.glob(directory + "/*.pdb") |
||
317 | files_sorted = sort_nicely(files) |
||
318 | for f in files_sorted: |
||
319 | #print f |
||
320 | models.append(RNAmodel(f)) |
||
321 | return models |
||
322 | |||
323 | def sort_nicely( l ): |
||
324 | """ Sort the given list in the way that humans expect. |
||
325 | |||
326 | http://blog.codinghorror.com/sorting-for-humans-natural-sort-order/ |
||
327 | """ |
||
328 | convert = lambda text: int(text) if text.isdigit() else text |
||
329 | alphanum_key = lambda key: [ convert(c) for c in re.split('([0-9]+)', key) ] |
||
330 | l.sort( key=alphanum_key ) |
||
331 | return l |
||
332 | |||
333 | |||
334 | View Code Duplication | def get_parser(): |
|
335 | parser = argparse.ArgumentParser( |
||
336 | description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) |
||
337 | |||
338 | parser.add_argument('-t',"--target", |
||
339 | help="target file") |
||
340 | parser.add_argument('--result', #default='out.rmsd', |
||
341 | help="result file") |
||
342 | parser.add_argument('--ignore-files', default='aligned', help="use to ignore files, .e.g. with 'aligned'") |
||
343 | parser.add_argument('--suffix', default='aligned', help="used with --saved, by default: aligned") |
||
344 | parser.add_argument('--way', help="e.g., backbone+sugar, c1, c1+Nx") |
||
345 | parser.add_argument('--triple-mode', help="same crazy strategy to align triples", default=True, action="store_true") |
||
346 | parser.add_argument('--column-name', help="name column for rmsd, by default 'rmsd', but can also be a name of the target file", default="rmsd") |
||
347 | parser.add_argument("-s", "--save", action="store_true", help="set suffix with --suffix, by default: aligned") |
||
348 | parser.add_argument("--folder-prefix", default = '', help="folder name, t2-3-UAU_NAME_aligned") |
||
349 | |||
350 | parser.add_argument("--debug", action="store_true") |
||
351 | parser.add_argument("--sort", action="store_true", help='sort results based on rmsd (ascending), --result must be set up') |
||
352 | parser.add_argument("--tseq", help='target sequence, e.g. acu, find only triples of this sequence [use the order for the seq taken from the input PDB file, literally order of residues in a pdb file]') |
||
353 | parser.add_argument('--files', help='files', nargs='+', default=RT + '/rna_tools/tools/triplexibility/db/triples-all-v2-rpr/*.pdb') |
||
354 | return parser |
||
355 | |||
356 | # main |
||
357 | View Code Duplication | if __name__ == '__main__': |
|
358 | parser = get_parser() |
||
359 | args = parser.parse_args() |
||
360 | if not args.debug: |
||
361 | ic.disable() |
||
362 | |||
363 | target = RNAmodel(args.target) |
||
364 | |||
365 | # a trick to get files be default if there is only a path (so a string ;-)) |
||
366 | if not isinstance(args.files, str): |
||
367 | models = args.files |
||
368 | else: |
||
369 | import glob |
||
370 | models = glob.glob(args.files) # opts.input_dir |
||
371 | |||
372 | ## tmp = [] |
||
373 | ## if args.ignore_files: |
||
374 | ## for f in args.files: |
||
375 | ## if args.debug: print(f) |
||
376 | ## if args.ignore_files in f: |
||
377 | ## continue |
||
378 | ## tmp.append(f) |
||
379 | ## models = tmp |
||
380 | |||
381 | print('# of models:', len(models)) |
||
382 | |||
383 | c = 1 |
||
384 | t = 'fn,' + args.column_name + '\n' # ,aligned_seq, aligned_fn\n' |
||
385 | for m in models: |
||
386 | mrna = RNAmodel(m) |
||
387 | #print r1.fn, r2.fn, r1.get_rmsd_to(r2)#, 'tmp.pdb') |
||
388 | # print(m) |
||
389 | rmsd = target.get_rmsd_to(mrna, args.way, args.triple_mode, args.save, args.tseq) #, 'tmp.pdb') |
||
390 | #except: |
||
391 | if 0: |
||
392 | print(m) |
||
393 | sys.exit(1) |
||
394 | #print rmsd |
||
395 | t += mrna.fn + ',' + str(rmsd) + '\n' |
||
396 | #break |
||
397 | |||
398 | print(t.strip()) |
||
399 | |||
400 | if args.result: |
||
401 | with open(args.result, 'w') as f: |
||
402 | f.write(t) |
||
403 | |||
404 | if args.sort: |
||
405 | import pandas as pd |
||
406 | df = pd.read_csv(args.result) |
||
407 | df = df[df.rmsd != -1] |
||
408 | df = df.sort_values('rmsd') |
||
409 | df.to_csv(args.result, index=False) |
||
410 | print('saved: %s' % args.result) |
||
411 | print(df.to_string(index=False)) |
||
412 |