Completed
Push — master ( b4caba...44c104 )
by Gonzalo
9s
created

GitHubRepo   F

Complexity

Total Complexity 65

Size/Duplication

Total Lines 286
Duplicated Lines 0 %

Importance

Changes 24
Bugs 0 Features 1
Metric Value
c 24
b 0
f 1
dl 0
loc 286
rs 3.3333
wmc 65

19 Methods

Rating   Name   Duplication   Size   Complexity  
A _check_user() 0 12 3
A _check_repo_name() 0 11 3
B _check_rate() 0 12 5
A __init__() 0 18 1
B _filter_by_branch() 0 16 6
A milestones() 0 4 1
A set_labels() 0 18 4
A pr() 0 4 1
A _filer_closed_prs() 0 14 4
B _filter_until() 0 9 5
B milestone() 0 19 5
A tags() 0 4 1
B _filter_since() 0 9 5
B tag() 0 19 5
A labels() 0 4 1
B _filter_milestone() 0 13 5
A str_to_date() 0 9 3
A is_merged() 0 13 2
B issues() 0 57 5

How to fix   Complexity   

Complex Class

Complex classes like GitHubRepo often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
# -----------------------------------------------------------------------------
3
# Copyright (c) The Spyder Development Team
4
#
5
# Licensed under the terms of the MIT License
6
# (See LICENSE.txt for details)
7
# -----------------------------------------------------------------------------
8
"""Github repo wrapper."""
9
10
from __future__ import print_function
11
12
# Standard library imports
13
import datetime
14
import sys
15
import time
16
17
# Local imports
18
from loghub.external.github import ApiError, ApiNotFoundError, GitHub
19
20
21
class GitHubRepo(object):
22
    """Github repository wrapper."""
23
24
    def __init__(self, username=None, password=None, token=None, repo=None):
25
        """Github repository wrapper."""
26
        self._username = username
27
        self._password = password
28
        self._token = token
29
30
        self.gh = GitHub(
31
            username=username,
32
            password=password,
33
            access_token=token, )
34
        repo_organization, repo_name = repo.split('/')
35
        self._repo_organization = repo_organization
36
        self._repo_name = repo_name
37
        self.repo = self.gh.repos(repo_organization)(repo_name)
38
39
        # Check username and repo name
40
        self._check_user()
41
        self._check_repo_name()
42
43
    def _check_user(self):
44
        """Check if the supplied username is valid."""
45
        try:
46
            self.gh.users(self._repo_organization).get()
47
        except ApiNotFoundError:
48
            print('LOGHUB: Organization/user `{}` seems to be '
49
                  'invalid.\n'.format(self._repo_organization))
50
            sys.exit(1)
51
        except ApiError:
52
            self._check_rate()
53
            print('LOGHUB: The credentials seems to be invalid!\n')
54
            sys.exit(1)
55
56
    def _check_repo_name(self):
57
        """Check if the supplied repository exists."""
58
        try:
59
            self.repo.get()
60
        except ApiNotFoundError:
61
            print('LOGHUB: Repository `{0}` for organization/username `{1}` '
62
                  'seems to be invalid.\n'.format(self._repo_name,
63
                                                  self._repo_organization))
64
            sys.exit(1)
65
        except ApiError:
66
            self._check_rate()
67
68
    def _check_rate(self):
69
        """Check and handle if api rate limit has been exceeded."""
70
        if self.gh.x_ratelimit_remaining == 0:
71
            reset_struct = time.gmtime(self.gh.x_ratelimit_reset)
72
            reset_format = time.strftime('%Y/%m/%d %H:%M', reset_struct)
73
            print('LOGHUB: GitHub API rate limit exceeded!')
74
            print('LOGHUB: GitHub API rate limit resets on '
75
                  '{}'.format(reset_format))
76
            if not self._username and not self._password or not self._token:
77
                print('LOGHUB: Try running loghub with user/password or '
78
                      'a valid token.\n')
79
            sys.exit(1)
80
81
    def _filter_milestone(self, issues, milestone):
82
        """Filter out all issues in milestone."""
83
        if milestone:
84
            for issue in issues[:]:
85
                milestone_data = issue.get('milestone', {})
86
                if milestone_data:
87
                    issue_milestone_title = milestone_data.get('title')
88
                else:
89
                    issue_milestone_title = ''
90
91
                if issue_milestone_title != milestone:
92
                    issues.remove(issue)
93
        return issues
94
95
    def _filter_since(self, issues, since):
96
        """Filter out all issues before `since` date."""
97
        if since:
98
            since_date = self.str_to_date(since)
99
            for issue in issues[:]:
100
                close_date = self.str_to_date(issue['closed_at'])
101
                if close_date < since_date and issue in issues:
102
                    issues.remove(issue)
103
        return issues
104
105
    def _filter_until(self, issues, until):
106
        """Filter out all issues after `until` date."""
107
        if until:
108
            until_date = self.str_to_date(until)
109
            for issue in issues[:]:
110
                close_date = self.str_to_date(issue['closed_at'])
111
                if close_date > until_date and issue in issues:
112
                    issues.remove(issue)
113
        return issues
114
115
    def _filter_by_branch(self, issues, issue, branch):
116
        """Filter prs by the branch they were merged into."""
117
        number = issue['number']
118
119
        if not self.is_merged(number) and issue in issues:
120
            issues.remove(issue)
121
122
        if branch:
123
            # Get PR info and get base branch
124
            pr_data = self.pr(number)
125
            base_ref = pr_data['base']['ref']
126
127
            if base_ref != branch and issue in issues:
128
                issues.remove(issue)
129
130
        return issues
131
132
    def _filer_closed_prs(self, issues, branch):
133
        """Filter out closed PRs."""
134
        for issue in issues[:]:
135
            pr = issue.get('pull_request', '')
136
137
            # Add label names inside additional key
138
            issue['loghub_label_names'] = [
139
                l['name'] for l in issue.get('labels')
140
            ]
141
142
            if pr:
143
                issues = self._filter_by_branch(issues, issue, branch)
144
145
        return issues
146
147
    def tags(self):
148
        """Return all tags."""
149
        self._check_rate()
150
        return self.repo('git')('refs')('tags').get()
151
152
    def tag(self, tag_name):
153
        """Get tag information."""
154
        self._check_rate()
155
        refs = self.repo('git')('refs')('tags').get()
156
        sha = -1
157
158
        tags = []
159
        for ref in refs:
160
            ref_name = 'refs/tags/{tag}'.format(tag=tag_name)
161
            if 'object' in ref and ref['ref'] == ref_name:
162
                sha = ref['object']['sha']
163
            tags.append(ref['ref'].split('/')[-1])
164
165
        if sha == -1:
166
            print("LOGHUB: You didn't pass a valid tag name!")
167
            print('LOGHUB: The available tags are: {0}\n'.format(tags))
168
            sys.exit(1)
169
170
        return self.repo('git')('tags')(sha).get()
171
172
    def labels(self):
173
        """Return labels for the repo."""
174
        self._check_rate()
175
        return self.repo.labels.get()
176
177
    def set_labels(self, labels):
178
        """Return labels for the repo."""
179
        self._check_rate()
180
        for label in labels:
181
            new_name = label['new_name']
182
            old_name = label['old_name']
183
            color = label['color']
184
            try:
185
                self.repo.labels(old_name).patch(name=new_name, color=color)
186
                print('Updated label: "{0}" -> "{1}" (#{2})'.format(
187
                    old_name, new_name, color))
188
            except ApiError:
189
                try:
190
                    self.repo.labels.post(name=new_name, color=color)
191
                    print('Created label: "{0}" (#{1})'.format(new_name,
192
                                                               color))
193
                except ApiError:
194
                    print('\nLabel "{0}" already exists!'.format(new_name))
195
196
    def milestones(self):
197
        """Return all milestones."""
198
        self._check_rate()
199
        return self.repo.milestones.get(state='all')
200
201
    def milestone(self, milestone_title):
202
        """Return milestone with given title."""
203
        self._check_rate()
204
        milestones = self.milestones()
205
        milestone_number = -1
206
207
        milestone_titles = [milestone['title'] for milestone in milestones]
208
        for milestone in milestones:
209
            if milestone['title'] == milestone_title:
210
                milestone_number = milestone['number']
211
                break
212
213
        if milestone_number == -1:
214
            print("LOGHUB: You didn't pass a valid milestone name!")
215
            print('LOGHUB: The available milestones are: {0}\n'
216
                  ''.format(milestone_titles))
217
            sys.exit(1)
218
219
        return milestone
220
221
    def pr(self, pr_number):
222
        """Get PR information."""
223
        self._check_rate()
224
        return self.repo('pulls')(str(pr_number)).get()
225
226
    def issues(self,
227
               milestone=None,
228
               state=None,
229
               assignee=None,
230
               creator=None,
231
               mentioned=None,
232
               labels=None,
233
               sort=None,
234
               direction=None,
235
               since=None,
236
               until=None,
237
               branch=None,
238
               cache=False,
239
               base_issues=None):
240
        """Return Issues and Pull Requests."""
241
        self._check_rate()
242
        page = 1
243
244
        if not base_issues:
245
            milestone_number = None
246
            if milestone:
247
                milestone_data = self.milestone(milestone)
248
                milestone_number = milestone_data.get('number')
249
            issues = []
250
            while True:
251
                result = self.repo.issues.get(page=page,
252
                                              per_page=100,
253
                                              milestone=milestone_number,
254
                                              state=state,
255
                                              assignee=assignee,
256
                                              creator=creator,
257
                                              mentioned=mentioned,
258
                                              labels=labels,
259
                                              sort=sort,
260
                                              direction=direction,
261
                                              since=since)
262
                if len(result) > 0:
263
                    issues += result
264
                    page = page + 1
265
                else:
266
                    break
267
        else:
268
            issues = base_issues
269
270
        # If since was provided, filter the issue
271
        issues = self._filter_since(issues, since)
272
273
        # If until was provided, filter the issue
274
        issues = self._filter_until(issues, until)
275
276
        # If milestone was provided, filter the issue
277
        issues = self._filter_milestone(issues, milestone)
278
279
        # If it is a pr check if it is merged or closed, removed closed ones
280
        issues = self._filer_closed_prs(issues, branch)
281
282
        return issues
283
284
    def is_merged(self, pr):
285
        """
286
        Return wether a PR was merged, or if it was closed and discarded.
287
288
        https://developer.github.com/v3/pulls/#get-if-a-pull-request-has-been-merged
289
        """
290
        self._check_rate()
291
        merged = True
292
        try:
293
            self.repo('pulls')(str(pr))('merge').get()
294
        except Exception:
295
            merged = False
296
        return merged
297
298
    @staticmethod
299
    def str_to_date(string):
300
        """Convert ISO date string to datetime object."""
301
        parts = string.split('T')
302
        date_parts = parts[0]
303
        time_parts = parts[1][:-1]
304
        year, month, day = [int(i) for i in date_parts.split('-')]
305
        hour, minutes, seconds = [int(i) for i in time_parts.split(':')]
306
        return datetime.datetime(year, month, day, hour, minutes, seconds)
307