build.rna_tools.tools.rna_calc_rmsd.rna_calc_rmsd_biopython   F
last analyzed

Complexity

Total Complexity 114

Size/Duplication

Total Lines 735
Duplicated Lines 1.36 %

Importance

Changes 0
Metric Value
eloc 574
dl 10
loc 735
rs 2
c 0
b 0
f 0
wmc 114

10 Functions

Rating   Name   Duplication   Size   Complexity  
A _residue_to_letter() 0 3 1
A _residue_is_polymer() 0 3 1
A expand_input_patterns() 0 14 4
D _build_alignment_strings() 0 72 12
A get_parser() 0 35 1
A rmsd_key() 0 6 2
C _print_alignment() 0 23 10
F _merge_alignment_columns() 0 72 17
A sort_nicely() 0 9 4
A get_rna_models_from_dir() 10 10 3

7 Methods

Rating   Name   Duplication   Size   Complexity  
A RNAmodel.__str__() 0 2 1
F RNAmodel.get_rmsd_to() 0 175 36
A RNAmodel.__get_atoms() 0 21 4
A RNAmodel.__init__() 0 6 1
F RNAmodel._atoms_from_sequence_alignment() 0 74 14
A RNAmodel.get_report() 0 6 2
A RNAmodel.__repr__() 0 2 1

How to fix   Duplicated Code    Complexity   

Duplicated Code

Duplicate code is one of the most pungent code smells. A rule that is often used is to re-structure code once it is duplicated in three or more places.

Common duplication problems, and corresponding solutions are:

Complexity

 Tip:   Before tackling complexity, make sure that you eliminate any duplication first. This often can reduce the size of classes significantly.

Complex classes like build.rna_tools.tools.rna_calc_rmsd.rna_calc_rmsd_biopython often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
#!/usr/bin/env python
2
# -*- coding: utf-8 -*-
3
4
"""
5
Examples::
6
7
  rna_calc_rmsd_biopython.py -t 2nd_triplex_FB_ABC.pdb  triples-all-v2-rpr/*.pdb --way backbone+sugar --save --suffix ABC --result abc.csv
8
9
  rna_calc_rmsd_biopython.py -t 2nd_triplex_FB_1AUA3_rpr.pdb test_data/triples2/Triple_cWW_tSH_GCA_exemplar_rpr_ren.pdb  --way backbone+sugar --save --column-name 'rmsd' --triple-mode > out.txt
10
11
"""
12
from __future__ import print_function
13
14
import Bio.PDB.PDBParser
15
import Bio.PDB.Superimposer
16
from Bio.Align import PairwiseAligner
17
import copy
18
import warnings
19
warnings.filterwarnings('ignore', '.*Invalid or missing.*',)
20
warnings.filterwarnings('ignore', '.*with given element *',)
21
warnings.filterwarnings('ignore', '.*is discontinuous*',)
22
warnings.filterwarnings('ignore', '.*Ignoring unrecognized record*',)
23
24
from collections import OrderedDict
25
from rna_tools.rna_tools_lib import set_chain_for_struc, RNAStructure
26
import argparse
27
import sys
28
import glob
29
import re
30
import os
31
32
NUCLEOTIDE_THREE_TO_ONE = {
33
    'A': 'A', 'DA': 'A', 'ADE': 'A',
34
    'C': 'C', 'DC': 'C', 'CYT': 'C',
35
    'G': 'G', 'DG': 'G', 'GUA': 'G',
36
    'U': 'U', 'DU': 'U', 'DT': 'U', 'T': 'U', 'PSU': 'U', 'URA': 'U',
37
    'I': 'I', 'DI': 'I'
38
}
39
40
WAY_TO_ATOMS = {
41
    'c1': ["C1'"],
42
    'backbone+sugar': "P,OP1,OP2,C5',O5',C4',O4',C3',O3',C2',O2',C1'".split(',')
43
}
44
45
46
def _residue_is_polymer(residue):
47
    hetfield = residue.id[0].strip()
48
    return hetfield == ''
49
50
51
def _residue_to_letter(residue):
52
    name = residue.get_resname().strip().upper()
53
    return NUCLEOTIDE_THREE_TO_ONE.get(name, 'N')
54
55
56
def _build_alignment_strings(alignment, seq1, seq2):
57
    seq1_line = []
58
    seq2_line = []
59
    match_line = []
60
    matched_pairs = []
61
    matches = 0
62
63
    path = getattr(alignment, 'path', None)
64
    if not path:
65
        return '', '', '', matched_pairs, matches
66
67
    seq1_idx = path[0][0]
68
    seq2_idx = path[0][1]
69
70
    try:
71
        seq1_ranges = alignment.aligned[0]
72
        seq2_ranges = alignment.aligned[1]
73
    except AttributeError:
74
        seq1_ranges = []
75
        seq2_ranges = []
76
77
    if len(seq1_ranges):
78
        seq1_start = int(seq1_ranges[0][0])
79
        seq1_end_idx = int(seq1_ranges[-1][1])
80
    else:
81
        seq1_start = seq1_idx
82
        seq1_end_idx = seq1_idx
83
    if len(seq2_ranges):
84
        seq2_start = int(seq2_ranges[0][0])
85
        seq2_end_idx = int(seq2_ranges[-1][1])
86
    else:
87
        seq2_start = seq2_idx
88
        seq2_end_idx = seq2_idx
89
90
    for next_i, next_j in path[1:]:
91
        while seq1_idx < next_i or seq2_idx < next_j:
92
            if seq1_idx < next_i and seq2_idx < next_j:
93
                char_a = seq1[seq1_idx]
94
                char_b = seq2[seq2_idx]
95
                seq1_line.append(char_a)
96
                seq2_line.append(char_b)
97
                if char_a == char_b:
98
                    match_line.append('|')
99
                    matches += 1
100
                else:
101
                    match_line.append(' ')
102
                matched_pairs.append((seq1_idx, seq2_idx))
103
                seq1_idx += 1
104
                seq2_idx += 1
105
            elif seq1_idx < next_i:
106
                char_a = seq1[seq1_idx]
107
                seq1_line.append(char_a)
108
                seq2_line.append('-')
109
                match_line.append(' ')
110
                seq1_idx += 1
111
            else:
112
                char_b = seq2[seq2_idx]
113
                seq1_line.append('-')
114
                seq2_line.append(char_b)
115
                match_line.append(' ')
116
                seq2_idx += 1
117
118
    seq1_line = ''.join(seq1_line)
119
    match_line = ''.join(match_line)
120
    seq2_line = ''.join(seq2_line)
121
    alignment_span = {
122
        'seq1_start': seq1_start,
123
        'seq2_start': seq2_start,
124
        'seq1_end': seq1_end_idx,
125
        'seq2_end': seq2_end_idx
126
    }
127
    return seq1_line, match_line, seq2_line, matched_pairs, matches, alignment_span
128
129
130
def _merge_alignment_columns(reference, existing_models, new_target, new_model, model_name='model'):
131
    """Merge pairwise alignments so every model shares the same gapped target string."""
132
    new_reference = []
133
    updated_models = [[] for _ in existing_models]
134
    aligned_new_model = []
135
    i = 0
136
    j = 0
137
    ref_len = len(reference)
138
    new_len = len(new_target)
139
140
    while i < ref_len or j < new_len:
141
        ref_char = reference[i] if i < ref_len else None
142
        new_char = new_target[j] if j < new_len else None
143
144
        if ref_char is None:
145
            new_reference.append(new_char)
146
            for existing in updated_models:
147
                existing.append('-')
148
            aligned_new_model.append(new_model[j])
149
            j += 1
150
            continue
151
        if new_char is None:
152
            new_reference.append(ref_char)
153
            for idx, existing in enumerate(updated_models):
154
                existing.append(existing_models[idx][i])
155
            aligned_new_model.append('-')
156
            i += 1
157
            continue
158
159
        if ref_char == new_char:
160
            new_reference.append(ref_char)
161
            for idx, existing in enumerate(updated_models):
162
                existing.append(existing_models[idx][i])
163
            aligned_new_model.append(new_model[j])
164
            i += 1
165
            j += 1
166
            continue
167
168
        if ref_char == '-':
169
            new_reference.append('-')
170
            for idx, existing in enumerate(updated_models):
171
                existing.append(existing_models[idx][i])
172
            aligned_new_model.append('-')
173
            i += 1
174
            continue
175
176
        if new_char == '-':
177
            new_reference.append('-')
178
            for existing in updated_models:
179
                existing.append('-')
180
            aligned_new_model.append(new_model[j])
181
            j += 1
182
            continue
183
184
        # mismatch: keep both columns but warn the user
185
        print('# warning: inconsistent target alignment between reference and {}: {!r} vs {!r}'.format(
186
            model_name, ref_char, new_char), file=sys.stderr)
187
        new_reference.append(ref_char)
188
        for idx, existing in enumerate(updated_models):
189
            existing.append(existing_models[idx][i])
190
        aligned_new_model.append('-')
191
        i += 1
192
        new_reference.append(new_char)
193
        for existing in updated_models:
194
            existing.append('-')
195
        aligned_new_model.append(new_model[j])
196
        j += 1
197
        continue
198
199
    merged_models = updated_models[:]
200
    merged_models.append(aligned_new_model)
201
    return new_reference, merged_models
202
203
204
def _print_alignment(seq1_line, match_line, seq2_line, header,
205
                     target_len=None, model_len=None, matches=None, residue_pairs=None,
206
                     rmsd=None):
207
    print(header)
208
    if seq1_line:
209
        print(seq1_line)
210
    if match_line:
211
        print(match_line)
212
    if seq2_line:
213
        print(seq2_line)
214
    details = []
215
    if target_len is not None:
216
        details.append('target_len={}'.format(target_len))
217
    if model_len is not None:
218
        details.append('model_len={}'.format(model_len))
219
    if residue_pairs is not None:
220
        details.append('residue_pairs={}'.format(residue_pairs))
221
    if matches is not None:
222
        details.append('matches={}'.format(matches))
223
    if rmsd is not None:
224
        details.append('rmsd={}'.format(rmsd))
225
    if details:
226
        print('# alignment info:', ', '.join(details))
227
228
229
class RNAmodel:
230
    def __init__(self, fpath):
231
        # parser 1-5 -> 1 2 3 4 5
232
        self.struc = Bio.PDB.PDBParser().get_structure('', fpath)
233
        self.__get_atoms()
234
        self.fpath = fpath
235
        self.fn = os.path.basename(fpath)
236
        # self.atoms = []
237
        # if save:
238
        #    self.save() # @save
239
240
    def __get_atoms(self):
241
        self.atoms = []
242
        self.residues = []
243
        self.sequence = ''
244
        self.sequence_residues = []
245
        self.sequence_positions = []
246
        self.sequence_atom_maps = []
247
        for res in self.struc.get_residues():
248
            self.residues.append(res)
249
            atom_map = OrderedDict()
250
            for at in res:
251
                self.atoms.append(at)
252
                atom_map[at.name] = at
253
            if _residue_is_polymer(res):
254
                self.sequence += _residue_to_letter(res)
255
                chain_id = res.get_parent().id
256
                resseq = res.get_id()[1]
257
                self.sequence_positions.append(f"{chain_id}:{resseq}")
258
                self.sequence_residues.append(res)
259
                self.sequence_atom_maps.append(atom_map)
260
        return self.atoms
261
262
    def __str__(self):
263
        return self.fn #+ ' # beads' + str(len(self.residues))
264
265
    def __repr__(self):
266
        return self.fn #+ ' # beads' + str(len(self.residues))
267
268
    def get_report(self):
269
        """Str a short report about rna model""" 
270
        t = ' '.join(['File: ', self.fn, ' # of atoms:', str(len(self.atoms)), '\n'])
271
        for r,a in zip(self.residues, self.atoms ):
272
            t += ' '.join(['resi: ', str(r) ,' atom: ', str(a) , '\n' ])
273
        return t
274
275
    def _atoms_from_sequence_alignment(self, other_rnamodel, way):
276
        if not self.sequence or not other_rnamodel.sequence:
277
            raise ValueError('Cannot align sequences when one of the models has no polymer residues')
278
279
        aligner = PairwiseAligner()
280
        aligner.mode = 'local'
281
        aligner.match_score = 1.0
282
        aligner.mismatch_score = 0.0
283
        gap_open_penalty = -abs(args.align_gap_open if args.align_gap_open is not None else 0.0)
0 ignored issues
show
introduced by
The variable args does not seem to be defined in case __name__ == '__main__' on line 602 is False. Are you sure this can never be the case?
Loading history...
284
        gap_extend_source = args.align_gap_extend if args.align_gap_extend is not None else args.align_gap_open
285
        gap_extend_penalty = -abs(gap_extend_source if gap_extend_source is not None else 0.0)
286
        aligner.open_gap_score = gap_open_penalty
287
        aligner.extend_gap_score = gap_extend_penalty
288
289
        alignments = aligner.align(self.sequence, other_rnamodel.sequence)
290
        try:
291
            alignment = alignments[0]
292
        except IndexError:
293
            raise ValueError('Biopython did not return an alignment')
294
295
        seq1_line, match_line, seq2_line, matched_pairs, matches, alignment_span = _build_alignment_strings(
296
            alignment, self.sequence, other_rnamodel.sequence)
297
298
        alignment_report = {
299
            'header': '# alignment between {} and {}'.format(self.fn, other_rnamodel.fn),
300
            'seq1_line': seq1_line,
301
            'match_line': match_line,
302
            'seq2_line': seq2_line,
303
            'target_len': len(seq1_line),
304
            'model_len': len(seq2_line),
305
            'matches': matches,
306
            'residue_pairs': len(matched_pairs),
307
            'seq1_start': alignment_span['seq1_start'],
308
            'seq1_end': alignment_span['seq1_end'],
309
            'seq2_start': alignment_span['seq2_start'],
310
            'seq2_end': alignment_span['seq2_end']
311
        }
312
313
        if not matched_pairs:
314
            raise ValueError('Sequence alignment returned no overlapping residues')
315
316
        allowed_names = WAY_TO_ATOMS.get(way)
317
        atoms_self = []
318
        atoms_other = []
319
        residues_used = 0
320
321
        for idx_a, idx_b in matched_pairs:
322
            atom_map_a = self.sequence_atom_maps[idx_a]
323
            atom_map_b = other_rnamodel.sequence_atom_maps[idx_b]
324
            if allowed_names:
325
                atom_names = [n for n in allowed_names if n in atom_map_a and n in atom_map_b]
326
            else:
327
                atom_names = [n for n in atom_map_a.keys() if n in atom_map_b]
328
            if not atom_names:
329
                continue
330
            for name in atom_names:
331
                atoms_self.append(atom_map_a[name])
332
                atoms_other.append(atom_map_b[name])
333
            residues_used += 1
334
335
        if not atoms_self:
336
            raise ValueError('No overlapping atoms remained after sequence alignment')
337
338
        seq_len = max(len(self.sequence), len(other_rnamodel.sequence))
339
        identity = (matches / float(seq_len)) if seq_len else 0.0
340
341
        info = {
342
            'aligned_residues': residues_used,
343
            'identity': identity,
344
            'atoms': len(atoms_self),
345
            'alignment_pairs': len(matched_pairs),
346
            'alignment_report': alignment_report
347
        }
348
        return atoms_self, atoms_other, info
349
350
    def get_rmsd_to(self, other_rnamodel, way="", triple_mode=False, save=True):
351
        """Calc rmsd P-atom based rmsd to other rna model"""
352
        sup = Bio.PDB.Superimposer()
353
        alignment_report = None
354
        if args.align_sequence:
0 ignored issues
show
introduced by
The variable args does not seem to be defined in case __name__ == '__main__' on line 602 is False. Are you sure this can never be the case?
Loading history...
355
            if triple_mode:
356
                raise ValueError('Sequence alignment mode is not supported together with triple mode')
357
            self.atoms_for_rmsd, other_atoms_for_rmsd, info = self._atoms_from_sequence_alignment(other_rnamodel, way)
358
            alignment_report = info.get('alignment_report')
359
            other_rnamodel.alignment_report = alignment_report
360
            if args.debug:
361
                print('Aligned residues:', info['aligned_residues'], 'identity:', round(info['identity'], 3), '#atoms:', info['atoms'])
362
            if not self.atoms_for_rmsd:
363
                raise ValueError('No overlapping atoms remained after sequence alignment based filtering')
364
        elif way == 'c1':
365
            self.atoms_for_rmsd = []
366
            for a in self.atoms:
367
                if a.name in ["C1'"]:
368
                    self.atoms_for_rmsd.append(a)
369
            if args.debug: print('atoms_for_rmsd', len(self.atoms_for_rmsd))
370
371
            other_atoms_for_rmsd = []
372
            for a in other_rnamodel.atoms:
373
                if a.name in ["C1'"]:
374
                    other_atoms_for_rmsd.append(a)
375
            if args.debug: print('other_atoms_for_rmsd', len(other_atoms_for_rmsd))
376
377
        elif way == 'backbone+sugar':
378
            self.atoms_for_rmsd = []
379
            for a in self.atoms:
380
                if a.name in "P,OP1,OP2,C5',O5',C4',O4',C3',O3',C2',O2',C1'".split(','):
381
                    self.atoms_for_rmsd.append(a)
382
            if args.debug: print('atoms_for_rmsd', len(self.atoms_for_rmsd))
383
                                 
384
            other_atoms_for_rmsd = []
385
386
            for a in other_rnamodel.atoms:
387
                if a.name in "P,OP1,OP2,C5',O5',C4',O4',C3',O3',C2',O2',C1'".split(','):
388
                    other_atoms_for_rmsd.append(a)
389
            if args.debug: print('other_atoms_for_rmsd', len(other_atoms_for_rmsd))
390
        else:
391
            self.atoms_for_rmsd = self.atoms
392
            other_atoms_for_rmsd = other_rnamodel.atoms
393
394
        if triple_mode:
395
            def chunks(lst, n):
396
                """Yield successive n-sized chunks from lst.
397
                https://stackoverflow.com/questions/312443/how-do-you-split-a-list-into-evenly-sized-chunks
398
                """
399
                for i in range(0, len(lst), n):
400
                    yield lst[i:i + n]
401
402
            rmsd_min = 10000 # ugly
403
            import itertools
404
            per = list(itertools.permutations([0, 1, 2]))
405
            lst = list(chunks(other_atoms_for_rmsd,
406
                              int(len(other_atoms_for_rmsd)/3))) # for 1 atom, this will be 1 x 3 [3 residues]
407
                       # so so len is 3 atoms so / by 3 to get how many atoms per residue
408
            sup_min = None
409
410
            for p in per:
411
                patoms = []
412
                for i in p: # p=(1, 2, 3)
413
                    patoms.extend(lst[i])
414
                #print(self.atoms_for_rmsd)
415
                ## print('patoms')
416
                ## for a in patoms:
417
                ##     print(a, a.get_parent().get_id())
418
                ## print('self.atoms_for_rmsd')
419
                ## for a in self.atoms_for_rmsd:
420
                ##     print(a, a.get_parent().get_id())
421
                #sup.set_atoms(patoms, self.atoms_for_rmsd)
422
                sup.set_atoms(self.atoms_for_rmsd, patoms)
423
                rms = round(sup.rms, 3)
424
425
                rt = None
426
                seq = ''
427
                for a in patoms:
428
                    r = a.get_parent()
429
                    if r != rt:
430
                        rt = r
431
                        seq += r.get_resname().strip()
432
433
                if rms < rmsd_min:
434
                    rmsd_min = rms
435
                    sup_min = copy.copy(sup)
436
                    suffix = seq
437
                    p_min = p
438
                    seq_min = seq
439
440
                if args.debug:
441
                    print(p, '', [i + 1 for i in p], end=' ')
442
                    print(seq, 'seq_min: a' + seq_min, rms)
0 ignored issues
show
introduced by
The variable seq_min does not seem to be defined for all execution paths.
Loading history...
443
444
            index = [0 ,0 ,0]
445
            index[0] = p_min.index(0)
0 ignored issues
show
introduced by
The variable p_min does not seem to be defined for all execution paths.
Loading history...
446
            index[1] = p_min.index(1)
447
            index[2] = p_min.index(2)
448
449
            # ugly re-set 123 to crazy id ! + 100, so this will
450
            # fill up 1 2 3 for the second for
451
            rs = other_rnamodel.struc[0].get_residues()
452
            for i, r in enumerate(rs):
453
                r.id = (' ', index[i] + 153, ' ') # ugly, some random offset
454
            
455
            for i, r in enumerate(other_rnamodel.struc[0].get_residues()):
456
                r.id = (' ', index[i] + 1, ' ')
457
                if args.debug: print(r)
458
459
            io = Bio.PDB.PDBIO()
460
            sup_min.apply(other_rnamodel.struc.get_atoms())
461
            # if args.debug: print(p_min, [i + 1 for i in p_min])
462
            io.set_structure(other_rnamodel.struc)
463
            
464
            args.save_here = True
465
            if args.save_here:
466
                f = os.path.basename(self.fpath.replace('.pdb', '_aligned'))
467
                # print(f)
468
                try:
469
                    os.mkdir(f)
470
                except:
471
                    pass
472
                fout = f + os.sep + os.path.basename(other_rnamodel.fpath.replace('.pdb', '_aligned_s' + suffix + '.pdb'))
0 ignored issues
show
introduced by
The variable suffix does not seem to be defined for all execution paths.
Loading history...
473
            else:
474
                fout = other_rnamodel.fpath.replace('.pdb', '_aligned_s' + suffix + '.pdb')
475
476
            # if args.debug: print(fout)
477
            io.save(fout)
478
479
            # ugly set chain to A
480
            set_chain_for_struc(fout, 'A', save_file_inplace=True)
481
            # and now run this to sort into 1 2 3
482
483
            r = RNAStructure(fout)
484
            remarks = r.get_remarks_text()
485
            r1 = r.get_res_text('A', 1)
486
            r2 = r.get_res_text('A', 2)
487
            r3 = r.get_res_text('A', 3)
488
            with open(fout, 'w') as f:
489
                f.write(remarks)
490
                f.write(r1)
491
                f.write(r2)
492
                f.write(r3)
493
            r.reload()
494
            r.get_rnapuzzle_ready()
495
            r.write()
496
            return str(rmsd_min) + ',s' + seq_min + ',' + os.path.basename(fout)
497
498
        else:
499
            sup.set_atoms(self.atoms_for_rmsd, other_atoms_for_rmsd)
500
            rms = round(sup.rms, 2)
501
502
            if alignment_report and getattr(args, 'print_alignment', False):
503
                _print_alignment(
504
                    alignment_report['seq1_line'],
505
                    alignment_report['match_line'],
506
                    alignment_report['seq2_line'],
507
                    alignment_report['header'],
508
                    target_len=alignment_report['target_len'],
509
                    model_len=alignment_report['model_len'],
510
                    matches=alignment_report['matches'],
511
                    residue_pairs=alignment_report['residue_pairs'],
512
                    rmsd=rms)
513
514
            if save:
515
                ## io = Bio.PDB.PDBIO()
516
                ## sup.apply(self.struc.get_atoms())
517
                ## io.set_structure(self.struc)
518
                ## io.save("aligned.pdb")
519
520
                io = Bio.PDB.PDBIO()
521
                sup.apply(other_rnamodel.struc.get_atoms())
522
                io.set_structure(other_rnamodel.struc)
523
                io.save(other_rnamodel.fpath.replace('.pdb', '_' + args.suffix + '.pdb'))
524
        return rms
525
526
    
527 View Code Duplication
def get_rna_models_from_dir(directory):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
528
    models = []
529
    if not os.path.exists(directory):
530
        raise Exception('Dir does not exist! ', directory)
531
    files = glob.glob(directory + "/*.pdb")
532
    files_sorted = sort_nicely(files)
533
    for f in files_sorted:
534
        #print f
535
        models.append(RNAmodel(f))
536
    return models
537
538
def sort_nicely( l ):
539
   """ Sort the given list in the way that humans expect.
540
541
   http://blog.codinghorror.com/sorting-for-humans-natural-sort-order/
542
   """
543
   convert = lambda text: int(text) if text.isdigit() else text
544
   alphanum_key = lambda key: [ convert(c) for c in re.split('([0-9]+)', key) ]
545
   l.sort( key=alphanum_key )
546
   return l
547
548
549
def expand_input_patterns(patterns):
550
    expanded = []
551
    for pattern in patterns:
552
        if any(char in pattern for char in '*?['):
553
            matches = glob.glob(pattern)
554
            if matches:
555
                expand_list = matches[:]
556
                sort_nicely(expand_list)
557
                expanded.extend(expand_list)
558
            else:
559
                expanded.append(pattern)
560
        else:
561
            expanded.append(pattern)
562
    return expanded
563
564
565
def get_parser():
566
    parser = argparse.ArgumentParser(
567
        description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter)
568
569
    parser.add_argument('-t',"--target",
570
                         help="target file")
571
    parser.add_argument('--result', #default='out.rmsd',
572
                         help="result file")
573
    parser.add_argument('--ignore-files', default='aligned', help="use to ignore files, .e.g. with 'aligned'")
574
    parser.add_argument('--suffix', default='aligned', help="used with --saved, by default: aligned")
575
    parser.add_argument('--way', help="e.g., backbone+sugar")
576
    parser.add_argument('--align-sequence', action='store_true',
577
                        help='align sequences with Biopython and compute RMSD only over aligned residues')
578
    parser.add_argument('--align-gap-open', type=float, default=10.0,
579
                        help='gap opening penalty (positive value) when using --align-sequence, default: 10')
580
    parser.add_argument('--align-gap-extend', type=float,
581
                        help='gap extension penalty (positive value) when using --align-sequence; default: same as gap open')
582
    parser.add_argument('--print-alignment', action='store_true',
583
                        help='print the pairwise sequence alignment used for atom matching')
584
    parser.add_argument('--print-target-sequence', action='store_true',
585
                        help='print the polymer sequence extracted from the target structure and continue processing')
586
    parser.add_argument('--alignment-fasta', nargs='?', const='seq.fasta',
587
                        help='write the aligned target/model sequences to FASTA; optional output path, default: seq.fasta')
588
    parser.add_argument('--add-rmsd-to-fasta-header', action='store_true',
589
                        help='prefix each FASTA model header with its RMSD value when writing --alignment-fasta')
590
    parser.add_argument('--sort-by-rmsd', action='store_true',
591
                        help='order FASTA entries by descending RMSD when using --alignment-fasta')
592
    parser.add_argument('--triple-mode', help="same crazy strategy to align triples", action="store_true")
593
    parser.add_argument('--column-name', help="name column for rmsd, by default 'rmsd', but can also be a name of the target file",
594
                        default="rmsd")
595
    parser.add_argument("-s", "--save", action="store_true", help="set suffix with --suffix, by default: aligned")
596
    parser.add_argument('files', help='files', nargs='+')
597
    parser.add_argument("--debug", action="store_true")
598
    parser.add_argument("--sort", action="store_true", help='sort results based on rmsd (ascending)')
599
    return parser
600
601
# main
602
if __name__ == '__main__':
603
    parser = get_parser()
604
    args = parser.parse_args()
605
    if args.alignment_fasta and not args.align_sequence:
606
        parser.error('--alignment-fasta requires --align-sequence')
607
608
    target = RNAmodel(args.target)
609
    target_sequence = target.sequence if target.sequence else ''
610
    target_len = len(target_sequence)
611
612
    if args.print_target_sequence:
613
        print('# target sequence (polymer residues only):', target.fn)
614
        if target.sequence:
615
            print(target.sequence)
616
            print('# length:', len(target.sequence))
617
        else:
618
            print('# warning: target structure does not contain polymer residues with standard names')
619
620
    models = expand_input_patterns(args.files)
621
    tmp = []
622
    if args.ignore_files:
623
        for f in models:
624
            if args.debug: print(f)
625
            if args.ignore_files in f:
626
                continue
627
            tmp.append(f)
628
        models = tmp
629
 
630
    print('# of models:', len(models))
631
    c = 1
632
    t = 'fn,' + args.column_name + ',aligned_seq, aligned_fn\n'
633
    alignment_entries = [] if args.alignment_fasta else None
634
    for m in models:
635
        mrna = RNAmodel(m)
636
        #print r1.fn, r2.fn, r1.get_rmsd_to(r2)#, 'tmp.pdb')
637
        # print(m)
638
        try:
639
            rmsd = target.get_rmsd_to(mrna, args.way, args.triple_mode, args.save) #, 'tmp.pdb')
640
        except ValueError as exc:
641
            if args.align_sequence:
642
                print('# warning:', mrna.fn, exc, file=sys.stderr)
643
                rmsd = 'nan'
644
            else:
645
                raise
646
        #except:
647
        if 0:
648
            print(m)
649
            sys.exit(1)
650
        #print rmsd
651
        if alignment_entries is not None:
652
            alignment = getattr(mrna, 'alignment_report', None)
653
            if alignment and alignment.get('seq1_line') and alignment.get('seq2_line'):
654
                alignment_entries.append({
655
                    'model_name': mrna.fn,
656
                    'target_seq': alignment['seq1_line'],
657
                    'model_seq': alignment['seq2_line'],
658
                    'seq1_start': alignment.get('seq1_start', 0),
659
                    'seq1_end': alignment.get('seq1_end', target_len),
660
                    'rmsd': rmsd
661
                })
662
            else:
663
                print('# warning: no alignment info collected for', mrna.fn, file=sys.stderr)
664
        t += mrna.fn + ',' + str(rmsd) + '\n'
665
        #break    
666
    print(t.strip())
667
    if args.result:
668
        with open(args.result, 'w') as f:
669
            f.write(t)
670
671
        if args.sort:
672
            import pandas as pd
673
            df = pd.read_csv(args.result)
674
            df = df.sort_values('rmsd')
675
            df.to_csv(args.result, index=False)
676
            print(df.to_string(index=False))
677
678
    if alignment_entries is not None:
679
        fasta_path = args.alignment_fasta
680
        valid_entries = [entry for entry in alignment_entries if entry['target_seq'] and entry['model_seq']]
681
        if not valid_entries:
682
            print('# warning: requested FASTA output but no alignment data was generated', file=sys.stderr)
683
        else:
684
            padded_entries = []
685
            for entry in valid_entries:
686
                seq1_start = entry.get('seq1_start', 0)
687
                seq1_end = entry.get('seq1_end', target_len)
688
                target_leading = target_sequence[:seq1_start] if target_sequence else '-' * max(seq1_start, 0)
689
                target_trailing = target_sequence[seq1_end:] if target_sequence else '-' * max((target_len - seq1_end) if target_len else 0, 0)
690
691
                leading = '-' * len(target_leading)
692
                trailing = '-' * len(target_trailing)
693
                padded_entries.append({
694
                    'model_name': entry['model_name'],
695
                    'target_seq': target_leading + entry['target_seq'] + target_trailing,
696
                    'model_seq': leading + entry['model_seq'] + trailing,
697
                    'rmsd': entry.get('rmsd')
698
                })
699
700
            if args.sort_by_rmsd:
701
                def rmsd_key(entry):
702
                    value = entry.get('rmsd')
703
                    try:
704
                        return float(value)
705
                    except (TypeError, ValueError):
706
                        return float('-inf')
707
                padded_entries = sorted(padded_entries, key=rmsd_key, reverse=True)
708
709
            ref_target = list(padded_entries[0]['target_seq'])
710
            model_names = [padded_entries[0]['model_name']]
711
            model_columns = [list(padded_entries[0]['model_seq'])]
712
713
            for entry in padded_entries[1:]:
714
                ref_target, model_columns = _merge_alignment_columns(
715
                    ref_target,
716
                    model_columns,
717
                    list(entry['target_seq']),
718
                    list(entry['model_seq']),
719
                    entry['model_name']
720
                )
721
                model_names.append(entry['model_name'])
722
723
            with open(fasta_path, 'w') as fasta:
724
                fasta.write('>{}\n'.format(target.fn))
725
                fasta.write(''.join(ref_target) + '\n')
726
                for name, seq in zip(model_names, model_columns):
727
                    header_name = name
728
                    if args.add_rmsd_to_fasta_header:
729
                        rmsd_value = next((entry['rmsd'] for entry in padded_entries if entry['model_name'] == name), None)
0 ignored issues
show
introduced by
The variable entry does not seem to be defined in case the for loop on line 685 is not entered. Are you sure this can never be the case?
Loading history...
730
                        if rmsd_value is not None:
731
                            header_name = '{}|rmsd={}'.format(rmsd_value, name)
732
                    fasta.write('>{}\n'.format(header_name))
733
                    fasta.write(''.join(seq) + '\n')
734
            print('# FASTA alignment written to', fasta_path)
735
736
        
737