1 | from skf.database import db |
||
2 | from sqlalchemy import asc, desc |
||
3 | from skf.database.groupmembers import groupmembers |
||
4 | from skf.database.project_sprints import project_sprints |
||
5 | from skf.database.checklists_results import checklists_results |
||
6 | from skf.database.checklists import checklists |
||
7 | from skf.database.kb_items import kb_items |
||
8 | from skf.database.comments import comments |
||
9 | |||
10 | from skf.api.security import log, val_num, val_alpha_num, val_alpha_num_special |
||
11 | import base64 |
||
12 | import string |
||
13 | import random |
||
14 | from datetime import date |
||
0 ignored issues
–
show
Unused Code
introduced
by
Loading history...
|
|||
15 | |||
16 | def get_sprint_item(sprint_id, user_id): |
||
17 | log("User requested specific sprint item", "MEDIUM", "PASS") |
||
18 | val_num(sprint_id) |
||
19 | val_num(user_id) |
||
20 | result = project_sprints.query.filter(project_sprints.sprintID == sprint_id).one() |
||
21 | return result |
||
22 | |||
23 | |||
24 | def get_sprint_results(sprint_id, user_id): |
||
25 | log("User requested specific sprint items", "MEDIUM", "PASS") |
||
26 | val_num(sprint_id) |
||
27 | val_num(user_id) |
||
28 | result = checklists_results.query.filter(checklists_results.sprintID == sprint_id).group_by(checklists_results.checklistID).order_by(asc(checklists_results.status)).paginate(1, 500, False) |
||
29 | return order_sprint_results(result) |
||
30 | |||
31 | |||
32 | def get_sprint_results_audit(sprint_id, user_id): |
||
33 | log("User requested specific sprint audit items", "MEDIUM", "PASS") |
||
34 | val_num(sprint_id) |
||
35 | val_num(user_id) |
||
36 | result = checklists_results.query.filter(checklists_results.sprintID == sprint_id).filter(checklists_results.status == 5).group_by(checklists_results.checklistID).paginate(1, 500, False) |
||
37 | return order_sprint_results(result) |
||
38 | |||
39 | |||
40 | def get_sprint_results_audit_export(sprint_id, user_id): |
||
41 | log("User requested specific sprint audit export", "MEDIUM", "PASS") |
||
42 | val_num(sprint_id) |
||
43 | val_num(user_id) |
||
44 | result = checklists_results.query.filter(checklists_results.sprintID == sprint_id).filter(checklists_results.status == 5).group_by(checklists_results.checklistID).paginate(1, 500, False) |
||
45 | return {'message': export_failed_results(result) } |
||
46 | |||
47 | |||
48 | def delete_sprint(sprint_id, user_id): |
||
49 | log("User deleted sprint", "MEDIUM", "PASS") |
||
50 | val_num(sprint_id) |
||
51 | val_num(user_id) |
||
52 | result = (project_sprints.query.filter(project_sprints.sprintID == sprint_id).one()) |
||
53 | db.session.delete(result) |
||
54 | db.session.commit() |
||
55 | return {'message': 'Sprint successfully deleted'} |
||
56 | |||
57 | |||
58 | def update_sprint(sprint_id, user_id, data): |
||
59 | log("User updated sprint", "MEDIUM", "PASS") |
||
60 | val_num(sprint_id) |
||
61 | val_num(user_id) |
||
62 | sprint = project_sprints.query.filter(project_sprints.sprintID == sprint_id).one() |
||
63 | val_alpha_num_special(data.get('name')) |
||
64 | val_alpha_num_special(data.get('description')) |
||
65 | sprint.sprintName = data.get('name') |
||
66 | sprint.sprintDesc = data.get('description') |
||
67 | db.session.add(sprint) |
||
68 | db.session.commit() |
||
69 | return {'message': 'Sprint successfully updated'} |
||
70 | |||
71 | |||
72 | def new_sprint(user_id, data): |
||
73 | log("User created new sprint", "MEDIUM", "PASS") |
||
74 | val_alpha_num_special(data.get('name')) |
||
75 | val_alpha_num_special(data.get('description')) |
||
76 | val_num(data.get('projectID')) |
||
77 | sprintName = data.get('name') |
||
78 | sprintDesc = data.get('description') |
||
79 | projectID = data.get('projectID') |
||
80 | groupmember = groupmembers.query.filter(groupmembers.userID == user_id).one() |
||
81 | groupID = groupmember.groupID |
||
82 | sprintAdd = project_sprints(sprintName, sprintDesc, groupID, projectID) |
||
83 | db.session.add(sprintAdd) |
||
84 | db.session.commit() |
||
85 | result = project_sprints.query.filter(project_sprints.groupID == groupID).order_by(desc(project_sprints.sprintID)).first() |
||
86 | return {'sprintID': result.sprintID, 'message': 'Sprint successfully created'} |
||
87 | |||
88 | |||
89 | def stats_sprint(project_id): |
||
90 | log("User requested specific project sprint stats", "MEDIUM", "PASS") |
||
91 | val_num(project_id) |
||
92 | sprint_info = (project_sprints.query.filter(project_sprints.projectID == project_id).all()) |
||
93 | sprint = [] |
||
94 | for result in sprint_info: |
||
95 | sprint_id = result.sprintID |
||
96 | sprint_desc = result.sprintDesc |
||
97 | sprint_name = result.sprintName |
||
98 | sprint_open = (checklists_results.query.filter(checklists_results.sprintID == sprint_id).filter(checklists_results.status == 1).group_by(checklists_results.checklistID).group_by(checklists_results.checklistID).count()) |
||
99 | sprint_closed = (checklists_results.query.filter(checklists_results.sprintID == sprint_id).filter(checklists_results.status == 2).group_by(checklists_results.checklistID).group_by(checklists_results.checklistID).count()) |
||
100 | sprint_accepted = (checklists_results.query.filter(checklists_results.sprintID == sprint_id).filter(checklists_results.status == 3).group_by(checklists_results.checklistID).group_by(checklists_results.checklistID).count()) |
||
101 | sprint_sec_ack = (checklists_results.query.filter(checklists_results.sprintID == sprint_id).filter(checklists_results.status == 4).group_by(checklists_results.checklistID).group_by(checklists_results.checklistID).count()) |
||
102 | sprint_sec_fail = (checklists_results.query.filter(checklists_results.sprintID == sprint_id).filter(checklists_results.status == 5).group_by(checklists_results.checklistID).group_by(checklists_results.checklistID).count()) |
||
103 | total = sprint_open + sprint_closed + sprint_accepted + sprint_sec_ack + sprint_sec_fail |
||
104 | sprint.append({'sprint_id': sprint_id, 'sprint_desc': sprint_desc, 'sprint_name': sprint_name, 'sprint_open': sprint_open, 'sprint_closed': sprint_closed, 'sprint_accepted': sprint_accepted, 'sprint_sec_ack': sprint_sec_ack, 'sprint_sec_fail': sprint_sec_fail, 'sprint_items_total': total}) |
||
105 | return sprint |
||
106 | |||
107 | |||
108 | def order_sprint_results(sprint_results): |
||
109 | ordered_list = [] |
||
110 | ordered_closed = [] # 2 |
||
111 | ordered_accepted = [] # 3 |
||
112 | ordered_verified = [] # 4 |
||
113 | ordered_failed = [] # 5 |
||
114 | for item in sprint_results.items: |
||
115 | numbers = item.checklistID.split('.') |
||
116 | category = int(numbers[0]) |
||
117 | category_requirement = int(numbers[1]) |
||
118 | if (item.status == 1): |
||
119 | ordered_list = insert_in_order(category, category_requirement, item, ordered_list) |
||
120 | elif (item.status == 2): |
||
121 | ordered_closed = insert_in_order(category, category_requirement, item, ordered_closed) |
||
122 | elif (item.status == 3): |
||
123 | ordered_accepted = insert_in_order(category, category_requirement, item, ordered_accepted) |
||
124 | elif (item.status == 4): |
||
125 | ordered_verified = insert_in_order(category, category_requirement, item, ordered_verified) |
||
126 | else: |
||
127 | ordered_failed = insert_in_order(category, category_requirement, item, ordered_failed) |
||
128 | sprint_results.items = ordered_list + ordered_closed + ordered_accepted + ordered_verified + ordered_failed |
||
129 | return sprint_results |
||
130 | |||
131 | |||
132 | def insert_in_order(category, category_requirement, item, status_list): |
||
133 | if (len(status_list) == 0): |
||
134 | status_list.append(item) |
||
135 | else: |
||
136 | y = 0 |
||
137 | while y < len(status_list): |
||
138 | numbers_ordered = status_list[y].checklistID.split('.') |
||
139 | category_ordered = int(numbers_ordered[0]) |
||
140 | category_requirement_ordered = int(numbers_ordered[1]) |
||
141 | if (category < category_ordered): |
||
142 | status_list.insert(y, item) |
||
143 | break |
||
144 | else: |
||
145 | if (category == category_ordered): |
||
146 | if (category_requirement < category_requirement_ordered): |
||
147 | status_list.insert(y, item) |
||
148 | break |
||
149 | y = y + 1 |
||
150 | if (y == len(status_list)): |
||
151 | status_list.insert(y, item) |
||
152 | return status_list |
||
153 | |||
154 | |||
155 | def export_failed_results(sprint_results): |
||
156 | file_path = "export_" + id_generator(16) |
||
157 | with open(file_path, 'a') as file: |
||
158 | file.write('date,title,description,mitigation,notes\n') |
||
159 | |||
160 | for item in sprint_results.items: |
||
161 | checklist = checklists.query.filter(checklists.checklistID == item.checklistID).first() |
||
162 | kb_item = kb_items.query.filter(kb_items.kbID == item.kbID).first() |
||
163 | comment = comments.query.filter(comments.sprintID == item.sprintID).filter(comments.checklistID == item.checklistID).filter(comments.status == item.status).order_by(desc(comments.id)).first() |
||
164 | |||
165 | title = checklist.content.replace(',','\\,').replace('\n',' ').lstrip(' ').rstrip(' ').replace(' ',' ') |
||
166 | temp = kb_item.content.replace(',','\\,').split(" Solution:") |
||
167 | temp1 = temp[0].split(" Description:") |
||
168 | |||
169 | description = temp1[1].replace('\n',' ').lstrip(' ').rstrip(' ').replace(' ',' ') |
||
170 | mitigation = temp[1].replace('\n',' ').lstrip(' ').rstrip(' ').replace(' ',' ') |
||
171 | file.write('"' + comment.date + '","' + title + '","' + description + '","' + mitigation + '","' + comment.comment.replace(',','\\,').replace('\n',' ').lstrip(' ').rstrip(' ').replace(' ',' ') + '"\n') |
||
172 | |||
173 | with open(file_path, 'rb') as file: |
||
174 | return base64.b64encode(file.read()) |
||
175 | |||
176 | |||
177 | def id_generator(size=6, chars=string.ascii_uppercase + string.digits): |
||
178 | return ''.join(random.choice(chars) for _ in range(size)) |