Completed
Push — master ( 739b7a...711b3d )
by Glenn
01:21
created

export_failed_results()   A

Complexity

Conditions 4

Size

Total Lines 20

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 20
rs 9.2
cc 4
1
from skf.database import db
2
from sqlalchemy import asc, desc
3
from skf.database.groupmembers import groupmembers
4
from skf.database.project_sprints import project_sprints 
5
from skf.database.checklists_results import checklists_results
6
from skf.database.checklists import checklists
7
from skf.database.kb_items import kb_items
8
from skf.database.comments import comments
9
10
from skf.api.security import log, val_num, val_alpha_num, val_alpha_num_special
0 ignored issues
show
Unused Code introduced by blabla1337
Unused val_alpha_num imported from skf.api.security
Loading history...
11
import base64
12
import string
13
import random
14
from datetime import date
0 ignored issues
show
Unused Code introduced by Martin Marsicano
Unused date imported from datetime
Loading history...
15
16
def get_sprint_item(sprint_id, user_id):
17
    log("User requested specific sprint item", "MEDIUM", "PASS")
18
    val_num(sprint_id)
19
    val_num(user_id)
20
    result = project_sprints.query.filter(project_sprints.sprintID == sprint_id).one()
21
    return result
22
23
24
def get_sprint_results(sprint_id, user_id):
25
    log("User requested specific sprint items", "MEDIUM", "PASS")
26
    val_num(sprint_id)
27
    val_num(user_id)
28
    result = checklists_results.query.filter(checklists_results.sprintID == sprint_id).group_by(checklists_results.checklistID).order_by(asc(checklists_results.status)).paginate(1, 500, False)
29
    return order_sprint_results(result)
30
31
32
def get_sprint_results_audit(sprint_id, user_id):
33
    log("User requested specific sprint audit items", "MEDIUM", "PASS")
34
    val_num(sprint_id)
35
    val_num(user_id)
36
    result = checklists_results.query.filter(checklists_results.sprintID == sprint_id).filter(checklists_results.status == 5).group_by(checklists_results.checklistID).paginate(1, 500, False)
37
    return order_sprint_results(result)
38
39
40
def get_sprint_results_audit_export(sprint_id, user_id):
41
    log("User requested specific sprint audit export", "MEDIUM", "PASS")
42
    val_num(sprint_id)
43
    val_num(user_id)
44
    result = checklists_results.query.filter(checklists_results.sprintID == sprint_id).filter(checklists_results.status == 5).group_by(checklists_results.checklistID).paginate(1, 500, False)
45
    return {'message': export_failed_results(result) }
46
47
48
def delete_sprint(sprint_id, user_id):
49
    log("User deleted sprint", "MEDIUM", "PASS")
50
    val_num(sprint_id)
51
    val_num(user_id)
52
    result = (project_sprints.query.filter(project_sprints.sprintID == sprint_id).one())
53
    db.session.delete(result)
54
    db.session.commit()
55
    return {'message': 'Sprint successfully deleted'}
56
57
58
def update_sprint(sprint_id, user_id, data):
59
    log("User updated sprint", "MEDIUM", "PASS")
60
    val_num(sprint_id)
61
    val_num(user_id)
62
    sprint = project_sprints.query.filter(project_sprints.sprintID == sprint_id).one()
63
    val_alpha_num_special(data.get('name'))
64
    val_alpha_num_special(data.get('description'))
65
    sprint.sprintName = data.get('name')
66
    sprint.sprintDesc = data.get('description')
67
    db.session.add(sprint) 
68
    db.session.commit()
69
    return {'message': 'Sprint successfully updated'}
70
71
72
def new_sprint(user_id, data):
73
    log("User created new sprint", "MEDIUM", "PASS")
74
    val_alpha_num_special(data.get('name'))
75
    val_alpha_num_special(data.get('description'))
76
    val_num(data.get('projectID'))
77
    sprintName = data.get('name')
78
    sprintDesc = data.get('description')
79
    projectID = data.get('projectID')
80
    groupmember = groupmembers.query.filter(groupmembers.userID == user_id).one()
81
    groupID = groupmember.groupID
82
    sprintAdd = project_sprints(sprintName, sprintDesc, groupID, projectID)
83
    db.session.add(sprintAdd)
84
    db.session.commit()
85
    result = project_sprints.query.filter(project_sprints.groupID == groupID).order_by(desc(project_sprints.sprintID)).first()
86
    return {'sprintID': result.sprintID, 'message': 'Sprint successfully created'}
87
88
89
def stats_sprint(project_id):
90
    log("User requested specific project sprint stats", "MEDIUM", "PASS")
91
    val_num(project_id)
92
    sprint_info = (project_sprints.query.filter(project_sprints.projectID == project_id).all())
93
    sprint = []
94
    for result in sprint_info:
95
        sprint_id = result.sprintID
96
        sprint_desc = result.sprintDesc
97
        sprint_name = result.sprintName
98
        sprint_open = (checklists_results.query.filter(checklists_results.sprintID == sprint_id).filter(checklists_results.status == 1).group_by(checklists_results.checklistID).group_by(checklists_results.checklistID).count())
99
        sprint_closed = (checklists_results.query.filter(checklists_results.sprintID == sprint_id).filter(checklists_results.status == 2).group_by(checklists_results.checklistID).group_by(checklists_results.checklistID).count())
100
        sprint_accepted = (checklists_results.query.filter(checklists_results.sprintID == sprint_id).filter(checklists_results.status == 3).group_by(checklists_results.checklistID).group_by(checklists_results.checklistID).count())
101
        sprint_sec_ack = (checklists_results.query.filter(checklists_results.sprintID == sprint_id).filter(checklists_results.status == 4).group_by(checklists_results.checklistID).group_by(checklists_results.checklistID).count())
102
        sprint_sec_fail = (checklists_results.query.filter(checklists_results.sprintID == sprint_id).filter(checklists_results.status == 5).group_by(checklists_results.checklistID).group_by(checklists_results.checklistID).count())
103
        total = sprint_open + sprint_closed + sprint_accepted + sprint_sec_ack + sprint_sec_fail
104
        sprint.append({'sprint_id': sprint_id, 'sprint_desc': sprint_desc, 'sprint_name': sprint_name, 'sprint_open': sprint_open, 'sprint_closed': sprint_closed, 'sprint_accepted': sprint_accepted, 'sprint_sec_ack': sprint_sec_ack, 'sprint_sec_fail': sprint_sec_fail, 'sprint_items_total': total})
105
    return sprint
106
107
108
def order_sprint_results(sprint_results):
109
    ordered_list = []
110
    ordered_closed = [] # 2
111
    ordered_accepted = [] # 3
112
    ordered_verified = [] # 4
113
    ordered_failed = [] # 5
114
    for item in sprint_results.items:
115
        numbers = item.checklistID.split('.')
116
        category = int(numbers[0])
117
        category_requirement = int(numbers[1])
118
        if (item.status == 1):
119
            ordered_list = insert_in_order(category, category_requirement, item, ordered_list)
120
        elif (item.status == 2):
121
            ordered_closed = insert_in_order(category, category_requirement, item, ordered_closed)
122
        elif (item.status == 3):
123
            ordered_accepted = insert_in_order(category, category_requirement, item, ordered_accepted)
124
        elif (item.status == 4):
125
            ordered_verified = insert_in_order(category, category_requirement, item, ordered_verified)
126
        else:
127
            ordered_failed = insert_in_order(category, category_requirement, item, ordered_failed)
128
    sprint_results.items = ordered_list + ordered_closed + ordered_accepted + ordered_verified + ordered_failed
129
    return sprint_results
130
131
132
def insert_in_order(category, category_requirement, item, status_list):
133
    if (len(status_list) == 0):
134
        status_list.append(item)
135
    else:
136
        y = 0
137
        while y < len(status_list):
138
            numbers_ordered = status_list[y].checklistID.split('.')
139
            category_ordered = int(numbers_ordered[0])
140
            category_requirement_ordered = int(numbers_ordered[1])
141
            if (category < category_ordered):
142
                status_list.insert(y, item)
143
                break
144
            else:
145
                if (category == category_ordered):
146
                    if (category_requirement < category_requirement_ordered):
147
                        status_list.insert(y, item)
148
                        break
149
            y = y + 1
150
        if (y == len(status_list)):
151
            status_list.insert(y, item)
152
    return status_list
153
154
155
def export_failed_results(sprint_results):
156
    file_path = "export_" + id_generator(16)
157
    with open(file_path, 'a') as file:
158
        file.write('date,title,description,mitigation,notes\n')
159
160
        for item in sprint_results.items:
161
            checklist = checklists.query.filter(checklists.checklistID == item.checklistID).first()
162
            kb_item = kb_items.query.filter(kb_items.kbID == item.kbID).first()
163
            comment = comments.query.filter(comments.sprintID == item.sprintID).filter(comments.checklistID == item.checklistID).filter(comments.status == item.status).order_by(desc(comments.id)).first()
164
            
165
            title = checklist.content.replace(',','\\,').replace('\n',' ').lstrip(' ').rstrip(' ').replace('  ',' ')
166
            temp = kb_item.content.replace(',','\\,').split(" Solution:")
167
            temp1 = temp[0].split(" Description:")
168
            
169
            description = temp1[1].replace('\n',' ').lstrip(' ').rstrip(' ').replace('  ',' ')
170
            mitigation = temp[1].replace('\n',' ').lstrip(' ').rstrip(' ').replace('  ',' ')
171
            file.write('"' + comment.date + '","' + title + '","' + description + '","' + mitigation + '","' + comment.comment.replace(',','\\,').replace('\n',' ').lstrip(' ').rstrip(' ').replace('  ',' ') + '"\n')
172
173
    with open(file_path, 'rb') as file:
174
        return base64.b64encode(file.read())
175
176
177
def id_generator(size=6, chars=string.ascii_uppercase + string.digits):
178
    return ''.join(random.choice(chars) for _ in range(size))