Passed
Push — master ( 25b1c2...bdc07c )
by Jan
02:25 queued 11s
created

content_gh.upload_files()   A

Complexity

Conditions 3

Size

Total Lines 8
Code Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 3
eloc 7
nop 2
dl 0
loc 8
rs 10
c 0
b 0
f 0
1
#!/usr/bin/env python3
2
3
import argparse
4
import collections
5
import github
6
import re
7
import sys
8
9
rn_file = "release_notes.txt"
10
11
12
def version_type(string):
13
    components = string.split(".")
14
    shortest_component_len = min([len(x) for x in components])
15
    if len(components) != 3 or shortest_component_len == 0:
16
        msg = (
17
            "Expected version number of form X.Y.Z, where X, Y, Z are strings. "
18
            f"Got: '{string}'"
19
        )
20
        raise argparse.ArgumentTypeError(msg)
21
    return string
22
23
24
CHANGELOG = {"profiles": 1,
25
             "rules": 3,
26
             "tests": 2,
27
             "nosection": 0}
28
29
30
def get_repo(gh, owner, repo_name):
31
    try:
32
        repo = gh.get_repo(f"{owner}/{repo_name}")
33
        # By accessing a repo attribute. the PyGithub will actually query for the repo
34
        assert repo_name == repo.name
35
        return repo
36
    except github.UnknownObjectException:
37
        print(f"Repo '{owner}/{repo_name}' not found")
38
        sys.exit(0)
39
40
41
def get_milestone(repo, name):
42
    milestones = repo.get_milestones(state="all")
43
    matches = [m for m in milestones if m.title == name]
44
    assert len(matches) <= 1, \
45
        f"Expected to find at most one milestone {name}, found {len(matches)}"
46
    if len(matches) == 0:
47
        return None
48
    else:
49
        return matches[0]
50
51
52
def create_new_milestone(repo, name):
53
    try:
54
        print(f"Creating milestone {name}")
55
        return repo.create_milestone(name, "open")
56
    except github.GithubException as e:
57
        for error in e.data['errors']:
58
            if 'already_exists' in error['code']:
59
                print(f"Reusing already existing milestone {name}")
60
                return get_milestone(repo, name)
61
            else:
62
                print(f"Failed to create milesone {name}: {e.data['errors']}")
63
                sys.exit(1)
64
65
66
def transfer_open_issues_and_prs_to_new_milestone(repo, old_milestone, new_milestone):
67
    old_milestone_issues = repo.get_issues(milestone=old_milestone, state="open")
68
69
    n_issues = old_milestone_issues.totalCount
70
    # totalCount doesn't seem to work
71
    # print(f"Moving {n_issues} to new milesone")
72
    print(f"Moving issues to milestone {new_milestone.title}")
73
    for issue in old_milestone_issues:
74
        issue.edit(milestone=new_milestone)
75
76
77
def close_milestone(milestone):
78
    print(f"Closing milestone {milestone.title}")
79
    milestone.edit(milestone.title, state="closed")
80
81
82
def get_closed_prs(repo, milestone):
83
    closed_issues = repo.get_issues(milestone=milestone, state="closed", sort="updated")
84
    issues_with_prs = [i for i in closed_issues if i.pull_request is not None]
85
    merged_prs =
86
    [i.as_pull_request() for i in issues_with_prs if i.as_pull_request().merged is True]
87
    return merged_prs
88
89
90
def generate_release_notes(repo, args):
91
    milestone = get_milestone(repo, args.version)
92
93
    entry = ""
94
    release_notes = collections.defaultdict(list)
95
    product_profiles = collections.defaultdict(set)
96
    for pr in get_closed_prs(repo, milestone):
97
        pr_title = pr.title
98
        pr_number = str(pr.number)
99
        section = "nosection"
100
101
        changed_files = pr.get_files()
102
        for changed_file in changed_files:
103
            changed_filename = changed_file.filename
104
105
            # do not include files from tests folder
106
            # they are part of testing mechanisms
107
            if changed_filename.endswith(".profile") and "tests" not in changed_filename:
108
                # Track changes to product:profile
109
                profile_match = re.match(r"(\w+)/profiles/([\w-]+).profile", changed_filename)
110
                product, profile = profile_match.groups()
111
                product_profiles[product].add(profile)
112
113
                # A PR that changed .profile but not rule, check, remediation or tests,
114
                # go to Profiles section
115
                if CHANGELOG["profiles"] > CHANGELOG[section]:
116
                    section = "profiles"
117
118
            if "rule.yml" in changed_filename:
119
                # A PR that changed any rule.yml file will be in Rules section
120
                # Often, changes to infrastructure are done together with changes content
121
                section = "rules"
122
            elif "tests/" in changed_filename:
123
                if CHANGELOG["tests"] > CHANGELOG[section]:
124
                    section = "tests"
125
126
        if section != "nosection":
127
            entry = f"- {pr_title} (#{pr_number})\n"
128
            release_notes[section].append(entry)
129
130
    with open(rn_file, "w") as rn:
131
        rn.write("### Highlights:\n")
132
        hl_label = repo.get_label("Highlight")
133
        hl_prs = repo.get_issues(milestone=milestone, state="closed", labels=[hl_label])
134
        for pr in hl_prs:
135
            pr_title = pr.title
136
            pr_number = str(pr.number)
137
            rn.write(f"- {pr_title} (#{pr_number})\n")
138
139
        rn.write("### Profiles changed in this release:\n")
140
        for product in product_profiles:
141
            rn.write(f"- {product}: {', '.join(product_profiles[product])}\n")
142
143
        for section in CHANGELOG:
144
            if section == 'nosection':
145
                continue
146
            rn.write(f"### {section.capitalize()}:\n")
147
            for entry in release_notes[section]:
148
                rn.write(entry)
149
    print(f"Review release notes in '{rn_file}'")
150
151
152
def check_release_exists(repo, args):
153
    version = args.version
154
    try:
155
        latest_release = repo.get_latest_release()
156
        if latest_release.tag_name == f"v{version}":
157
            print(f"Release v{version} already exists")
158
            return True
159
    except github.UnknownObjectException:
160
        print(f"Release v{version} doesn't exist, good to go")
161
    return False
162
163
164
def move_milestone(repo, args):
165
    milestone = get_milestone(repo, args.version)
166
    next_milestone = create_new_milestone(repo, args.next_version)
167
168
    transfer_open_issues_and_prs_to_new_milestone(repo, milestone, next_milestone)
169
    close_milestone(milestone)
170
171
172
def upload_files(release, file_paths):
173
174
    for file_path in file_paths:
175
        try:
176
            asset = release.upload_asset(file_path)
177
            print(f"Uploaded {asset.name} to Release {release.tag_name}")
178
        except FileNotFoundError as e:
179
            print(f"Unable to find asset {file_path}")
180
181
182
def upload_assets(release, args):
183
    release = repo.get_latest_release()
184
    upload_files(args.files)
185
186
187
def create_release(repo, args):
188
    version = args.version
189
    commit = args.commit
190
    git_commit = repo.get_git_commit(commit)
191
192
    gtag = repo.create_git_tag(f"v{version}", f"v{version}", commit, "commit")
193
    with open(rn_file, "r") as rn:
194
        message = rn.read()
195
196
    # Create release in Draft state
197
    release = repo.create_git_release(f"v{version}", f"Content {version}", message,
198
                                      True, False, git_commit)
199
    assets = [f"artifacts/scap-security-guide-{version}.zip",
200
              f"artifacts/scap-security-guide-{version}-oval-510.zip",
201
              f"artifacts/scap-security-guide-{version}.tar.bz2"]
202
203
    upload_files(release, assets)
204
205
206
def create_parser():
207
    parser = argparse.ArgumentParser()
208
    parser.add_argument("--owner", default="ComplianceAscode")
209
    parser.add_argument("--repo", default="content")
210
    parser.add_argument("auth_token")
211
    parser.add_argument("version", type=version_type)
212
    subparsers = parser.add_subparsers(dest="subparser_name",
213
                                       help="Subcommands: check, move_milestone, rn, release")
214
    subparsers.required = True
215
216
    check_parser = subparsers.add_parser("check")
217
    check_parser.set_defaults(func=check_release_exists)
218
219
    milestone_parser = subparsers.add_parser("move_milestone")
220
    milestone_parser.set_defaults(func=move_milestone)
221
    milestone_parser.add_argument("next_version", type=version_type)
222
223
    rn_parser = subparsers.add_parser("rn")
224
    rn_parser.set_defaults(func=generate_release_notes)
225
226
    release = subparsers.add_parser("release")
227
    release.set_defaults(func=create_release)
228
    release.add_argument("commit")
229
230
    return parser.parse_args()
231
232
233
if __name__ == "__main__":
234
    parser = create_parser()
235
236
    gh = github.Github(parser.auth_token)
237
    repo = get_repo(gh, parser.owner, parser.repo)
238
239
    parser.func(repo, parser)
240