Completed
Push — master ( ce6edf...b9438d )
by Gonzalo
10s
created

GitHubRepo.__init__()   B

Complexity

Conditions 5

Size

Total Lines 35

Duplication

Lines 0
Ratio 0 %

Importance

Changes 6
Bugs 0 Features 0
Metric Value
cc 5
c 6
b 0
f 0
dl 0
loc 35
rs 8.0894
1
# -*- coding: utf-8 -*-
2
# -----------------------------------------------------------------------------
3
# Copyright (c) The Spyder Development Team
4
#
5
# Licensed under the terms of the MIT License
6
# (See LICENSE.txt for details)
7
# -----------------------------------------------------------------------------
8
"""Github repo wrapper."""
9
10
from __future__ import print_function
11
12
# Standard library imports
13
import datetime
14
import sys
15
16
# Local imports
17
from loghub.external.github import ApiError, ApiNotFoundError, GitHub
18
19
20
class GitHubRepo(object):
21
    """Github repository wrapper."""
22
23
    def __init__(self, username=None, password=None, token=None, repo=None):
24
        """Github repository wrapper."""
25
        self._username = username
26
        self._password = password
27
        self._token = token
28
29
        self.gh = GitHub(
30
            username=username,
31
            password=password,
32
            access_token=token, )
33
        repo_organization, repo_name = repo.split('/')
34
        self.repo = self.gh.repos(repo_organization)(repo_name)
35
36
        # Check organization/username
37
        try:
38
            self.gh.users(repo_organization).get()
39
        except ApiNotFoundError as error:
40
            print('LOGHUB: Organization/user `{}` seems to be '
41
                  'invalid.\n'.format(repo_organization))
42
            sys.exit(1)
43
        except ApiError as error:
44
            self._check_rate()
45
            print('LOGHUB: The credentials seems to be invalid!\n')
46
            sys.exit(1)
47
48
        # Check repo
49
        try:
50
            self.repo.get()
51
        except ApiNotFoundError as error:
52
            print('LOGHUB: Repository `{0}` for organization/username `{1}` '
53
                  'seems to be invalid.\n'.format(repo_name,
54
                                                  repo_organization))
55
            sys.exit(1)
56
        except ApiError as error:
57
            self._check_rate()
58
59
    def _check_rate(self):
60
        """Check and handle if api rate limit has been exceeded."""
61
        if self.gh.x_ratelimit_remaining == 0:
62
            print('LOGHUB: GitHub API rate limit exceeded!\n')
63
            if not self._username and not self._password or not self._token:
64
                print('LOGHUB: Try running loghub with user/password or '
65
                      'a valid token.\n')
66
            sys.exit(1)
67
68
    def tags(self):
69
        """Return all tags."""
70
        self._check_rate()
71
        return self.repo('git')('refs')('tags').get()
72
73
    def tag(self, tag_name):
74
        """Get tag information."""
75
        self._check_rate()
76
        refs = self.repo('git')('refs')('tags').get()
77
        sha = -1
78
79
        tags = []
80
        for ref in refs:
81
            ref_name = 'refs/tags/{tag}'.format(tag=tag_name)
82
            if 'object' in ref and ref['ref'] == ref_name:
83
                sha = ref['object']['sha']
84
            tags.append(ref['ref'].split('/')[-1])
85
86
        if sha == -1:
87
            print("LOGHUB: You didn't pass a valid tag name!")
88
            print('LOGHUB: The available tags are: {0}\n'.format(tags))
89
            sys.exit(1)
90
91
        return self.repo('git')('tags')(sha).get()
92
93
    def milestones(self):
94
        """Return all milestones."""
95
        self._check_rate()
96
        return self.repo.milestones.get(state='all')
97
98
    def milestone(self, milestone_title):
99
        """Return milestone with given title."""
100
        self._check_rate()
101
        milestones = self.milestones()
102
        milestone_number = -1
103
104
        milestone_titles = []
105
        for milestone in milestones:
106
            if milestone['title'] == milestone_title:
107
                milestone_number = milestone['number']
108
            milestone_titles.append(milestone['title'])
109
110
        if milestone_number == -1:
111
            print("LOGHUB: You didn't pass a valid milestone name!")
112
            print('LOGHUB: The available milestones are: {0}\n'
113
                  ''.format(milestone_titles))
114
            sys.exit(1)
115
116
        return milestone
117
118
    def pr(self, pr_number):
119
        """Get PR information."""
120
        self._check_rate()
121
        return self.repo('pulls')(str(pr_number)).get()
122
123
    def issues(self,
124
               milestone=None,
125
               state=None,
126
               assignee=None,
127
               creator=None,
128
               mentioned=None,
129
               labels=None,
130
               sort=None,
131
               direction=None,
132
               since=None,
133
               until=None,
134
               branch=None):
135
        """Return Issues and Pull Requests."""
136
        self._check_rate()
137
        page = 1
138
        issues = []
139
        while True:
140
            result = self.repo.issues.get(page=page,
141
                                          per_page=100,
142
                                          milestone=milestone,
143
                                          state=state,
144
                                          assignee=assignee,
145
                                          creator=creator,
146
                                          mentioned=mentioned,
147
                                          labels=labels,
148
                                          sort=sort,
149
                                          direction=direction,
150
                                          since=since)
151
            if len(result) > 0:
152
                issues += result
153
                page = page + 1
154
            else:
155
                break
156
157
        # If since was provided, filter the issue
158
        if since:
159
            since_date = self.str_to_date(since)
160
            for issue in issues[:]:
161
                close_date = self.str_to_date(issue['closed_at'])
162
                if close_date < since_date and issue in issues:
163
                    issues.remove(issue)
164
165
        # If until was provided, filter the issue
166
        if until:
167
            until_date = self.str_to_date(until)
168
            for issue in issues[:]:
169
                close_date = self.str_to_date(issue['closed_at'])
170
                if close_date > until_date and issue in issues:
171
                    issues.remove(issue)
172
173
        # If it is a pr check if it is merged or closed, removed closed ones
174
        for issue in issues[:]:
175
            pr = issue.get('pull_request', '')
176
177
            # Add label names inside additional key
178
            issue['loghub_label_names'] = [
179
                l['name'] for l in issue.get('labels')
180
            ]
181
182
            if pr:
183
                number = issue['number']
184
                if not self.is_merged(number) and issue in issues:
185
                    issues.remove(issue)
186
187
                if branch:
188
                    # Get PR info and get base branch
189
                    pr_data = self.pr(number)
190
                    base_ref = pr_data['base']['ref']
191
                    if base_ref != branch and issue in issues:
192
                        issues.remove(issue)
193
194
        return issues
195
196
    def is_merged(self, pr):
197
        """
198
        Return wether a PR was merged, or if it was closed and discarded.
199
200
        https://developer.github.com/v3/pulls/#get-if-a-pull-request-has-been-merged
201
        """
202
        self._check_rate()
203
        merged = True
204
        try:
205
            self.repo('pulls')(str(pr))('merge').get()
206
        except Exception:
207
            merged = False
208
        return merged
209
210
    @staticmethod
211
    def str_to_date(string):
212
        """Convert ISO date string to datetime object."""
213
        parts = string.split('T')
214
        date_parts = parts[0]
215
        time_parts = parts[1][:-1]
216
        year, month, day = [int(i) for i in date_parts.split('-')]
217
        hour, minutes, seconds = [int(i) for i in time_parts.split(':')]
218
        return datetime.datetime(year, month, day, hour, minutes, seconds)
219