GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.

GitDiffTool._parse_lines()   D
last analyzed

Complexity

Conditions 9

Size

Total Lines 65

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 9
dl 0
loc 65
rs 4.5755
c 0
b 0
f 0

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
# -*- coding: utf-8 -*-
2
# -----------------------------------------------------------------------------
3
# Copyright (c) 2016 Continuum Analytics, Inc.
4
#
5
# Licensed under the terms of the MIT License
6
# (see LICENSE.txt for details)
7
# -----------------------------------------------------------------------------
8
"""Version control helpers. Find staged, commited, modified files/lines."""
9
10
# Standard library imports
11
import os
12
import re
13
14
# Local imports
15
from ciocheck.config import (COMMITED_MODE, DEFAULT_BRANCH, STAGED_MODE,
16
                             UNSTAGED_MODE)
17
from ciocheck.utils import get_files, make_sorted_dict, run_command
18
19
20
class DiffToolBase(object):
21
    """Base version controll diff tool."""
22
23
    # --- Public API
24
    # -------------------------------------------------------------------------
25
    @property
26
    def top_level(self):
27
        """Return the top level for the repo."""
28
        raise NotImplementedError
29
30
    def is_repo(self):
31
        """Return if it is a repo of the type."""
32
        raise NotImplementedError
33
34
    def commited_files(self, branch=DEFAULT_BRANCH):
35
        """Return list of commited files."""
36
        raise NotImplementedError
37
38
    def staged_files(self):
39
        """Return list of staged files."""
40
        raise NotImplementedError
41
42
    def unstaged_files(self):
43
        """Return list of unstaged files."""
44
        raise NotImplementedError
45
46
    def commited_file_lines(self, branch=DEFAULT_BRANCH):
47
        """Return commited files and lines modified."""
48
        raise NotImplementedError
49
50
    def staged_file_lines(self):
51
        """Return unstaged files and lines modified."""
52
        raise NotImplementedError
53
54
    def unstaged_file_lines(self):
55
        """Return staged files and lines modified."""
56
        raise NotImplementedError
57
58
59
class HgDiffTool(DiffToolBase):
60
    """Mercurial diff tool."""
61
62
    def __init__(self, path):
63
        """Mercurial diff tool."""
64
        self.path = path
65
        self._top_level = None
66
67
    @property
68
    def top_level(self):
69
        """Return the top level for the repo."""
70
        return ''
71
72
    def is_repo(self):
73
        """Return if it is a repo of the type."""
74
        return False
75
76
    def commited_files(self, branch=DEFAULT_BRANCH):
77
        """Return list of commited files."""
78
        return []
79
80
    def staged_files(self):
81
        """Return list of staged files."""
82
        return []
83
84
    def unstaged_files(self):
85
        """Return list of unstaged files."""
86
        return []
87
88
    def commited_file_lines(self, branch=DEFAULT_BRANCH):
89
        """Return commited files and lines modified."""
90
        return {}
91
92
    def staged_file_lines(self):
93
        """Return unstaged files and lines modified."""
94
        return {}
95
96
    def unstaged_file_lines(self):
97
        """Return staged files and lines modified."""
98
        return {}
99
100
101
class GitDiffTool(DiffToolBase):
102
    """Thin wrapper for a subset of the `git diff` command."""
103
104
    # Regular expressions used to parse the diff output
105
    SRC_FILE_RE = re.compile(r'^diff --git "?a/.*"? "?b/([^ \n"]*)"?')
106
    MERGE_CONFLICT_RE = re.compile(r'^diff --cc ([^ \n]*)')
107
    HUNK_LINE_RE = re.compile(r'\+([0-9]*)')
108
109
    def __init__(self, path):
110
        """Thin wrapper for a subset of the `git diff` command."""
111
        self.path = path
112
        self._top_level = None
113
        self._is_repo = None
114
115
    def _git_run_helper(self,
116
                        branch=DEFAULT_BRANCH,
117
                        files_only=False,
118
                        mode=None):
119
        """Build git diff command to generate different types of diffs."""
120
        command = [
121
            'git',
122
            '-c',
123
            'diff.mnemonicprefix=no',
124
            'diff',
125
        ]
126
127
        if mode == COMMITED_MODE:
128
            command.append("{branch}...HEAD".format(branch=branch))
129
        elif mode == STAGED_MODE:
130
            command.append('--cached')
131
132
        command += [
133
            '--no-color',
134
            '--no-ext-diff',
135
            '--diff-filter=AM',  # Means "added" and "modified"
136
        ]
137
138
        if files_only:
139
            command += [
140
                '--name-only',
141
                '-z',  # Means nul-separated names
142
            ]
143
144
            output, error = run_command(command, cwd=self.path)
145
            print(error)
146
            result = set(output.split('\x00'))
147
            result.discard('')  # There's an empty line in git output
148
            result = [os.path.join(self.top_level, i) for i in sorted(result)]
149
            result = [i for i in result if i.startswith(self.path)]
150
        else:
151
            result, error = run_command(command, cwd=self.path)
152
153
        return result
154
155
    def _parse_diff_str(self, diff_str):
156
        """
157
        Parse the output of `git diff` into a dictionary.
158
159
        Dictionary in the form:
160
            { SRC_PATH: (ADDED_LINES, DELETED_LINES) }
161
        where `ADDED_LINES` and `DELETED_LINES` are lists of line numbers
162
        added/deleted respectively.
163
        """
164
        # Create a dict to hold results
165
        diff_dict = dict()
166
167
        # Parse the diff string into sections by source file
168
        sections_dict = self._parse_source_sections(diff_str)
169
        for (src_path, diff_lines) in sections_dict.items():
170
            full_src_path = os.path.join(self.top_level, src_path)
171
            # Parse the hunk information for the source file
172
            # to determine lines changed for the source file
173
            diff_dict[full_src_path] = self._parse_lines(diff_lines)
174
175
        ordered_diff_dict = make_sorted_dict(diff_dict)
176
        return ordered_diff_dict
177
178
    def _parse_source_sections(self, diff_str):
179
        """Parse source sections from git diff."""
180
        # Create a dict to map source files to lines in the diff output
181
        source_dict = dict()
182
183
        # Keep track of the current source file
184
        src_path = None
185
186
        # Signal that we've found a hunk (after starting a source file)
187
        found_hunk = False
188
189
        # Parse the diff string into sections by source file
190
        for line in diff_str.split('\n'):
191
192
            # If the line starts with "diff --git"
193
            # or "diff --cc" (in the case of a merge conflict)
194
            # then it is the start of a new source file
195
            if line.startswith('diff --git') or line.startswith('diff --cc'):
196
197
                # Retrieve the name of the source file
198
                src_path = self._parse_source_line(line)
199
200
                # Create an entry for the source file, if we don't
201
                # already have one.
202
                if src_path not in source_dict:
203
                    source_dict[src_path] = []
204
205
                # Signal that we're waiting for a hunk for this source file
206
                found_hunk = False
207
208
            # Every other line is stored in the dictionary for this source file
209
            # once we find a hunk section
210
            else:
211
212
                # Only add lines if we're in a hunk section
213
                # (ignore index and files changed lines)
214
                if found_hunk or line.startswith('@@'):
215
216
                    # Remember that we found a hunk
217
                    found_hunk = True
218
219
                    if src_path is not None:
220
                        source_dict[src_path].append(line)
221
222
                    else:
223
                        # We tolerate other information before we have
224
                        # a source file defined, unless it's a hunk line
225
                        if line.startswith("@@"):
226
                            msg = "Hunk has no source file: '{0}'".format(line)
227
                            raise Exception(msg)
228
229
        return source_dict
230
231
    def _parse_source_line(self, line):
232
        """Return path to source given a source line in `git diff`."""
233
        if '--git' in line:
234
            regex = self.SRC_FILE_RE
235
        elif '--cc' in line:
236
            regex = self.MERGE_CONFLICT_RE
237
        else:
238
            msg = ("Do not recognize format of source in line "
239
                   "'{0}'".format(line))
240
            raise Exception(msg)
241
242
        # Parse for the source file path
243
        groups = regex.findall(line)
244
245
        if len(groups) == 1:
246
            return groups[0]
247
248
        else:
249
            msg = "Could not parse source path in line '{0}'".format(line)
250
            raise Exception(msg)
251
252
    def _parse_lines(self, diff_lines):
253
        """
254
        Return  `(ADDED_LINES, DELETED_LINES)` for a source file in diff.
255
256
        `ADDED_LINES` and `DELETED_LINES` are lists of line numbers
257
        added/deleted respectively.
258
        """
259
        added_lines = []
260
        deleted_lines = []
261
262
        current_line_new = None
263
        current_line_old = None
264
265
        for line in diff_lines:
266
267
            # If this is the start of the hunk definition, retrieve
268
            # the starting line number
269
            if line.startswith('@@'):
270
                line_num = self._parse_hunk_line(line)
271
                current_line_new, current_line_old = line_num, line_num
272
273
            # This is an added/modified line, so store the line number
274
            elif line.startswith('+'):
275
276
                # Since we parse for source file sections before
277
                # calling this method, we're guaranteed to have a source
278
                # file specified.  We check anyway just to be safe.
279
                if current_line_new is not None:
280
281
                    # Store the added line
282
                    added_lines.append(current_line_new)
283
284
                    # Increment the line number in the file
285
                    current_line_new += 1
286
287
            # This is a deleted line that does not exist in the final
288
            # version, so skip it
289
            elif line.startswith('-'):
290
291
                # Since we parse for source file sections before
292
                # calling this method, we're guaranteed to have a source
293
                # file specified.  We check anyway just to be safe.
294
                if current_line_old is not None:
295
296
                    # Store the deleted line
297
                    deleted_lines.append(current_line_old)
298
299
                    # Increment the line number in the file
300
                    current_line_old += 1
301
302
            # This is a line in the final version that was not modified.
303
            # Increment the line number, but do not store this as a changed
304
            # line.
305
            else:
306
                if current_line_old is not None:
307
                    current_line_old += 1
308
309
                if current_line_new is not None:
310
                    current_line_new += 1
311
312
                # If we are not in a hunk, then ignore the line
313
                else:
314
                    pass
315
316
        return added_lines, deleted_lines
317
318
    def _parse_hunk_line(self, line):
319
        """
320
        Return the line number at the start of a hunk in a given line.
321
322
        A hunk is a segment of code that contains changes.
323
324
        The format of the hunk line is:
325
            @@ -k,l +n,m @@ TEXT
326
        where `k,l` represent the start line and length before the changes
327
        and `n,m` represent the start line and length after the changes.
328
        `git diff` will sometimes put a code excerpt from within the hunk
329
        in the `TEXT` section of the line.
330
        """
331
        # Split the line at the @@ terminators (start and end of the line)
332
        components = line.split('@@')
333
334
        # The first component should be an empty string, because
335
        # the line starts with '@@'.  The second component should
336
        # be the hunk information, and any additional components
337
        # are excerpts from the code.
338
        if len(components) >= 2:
339
340
            hunk_info = components[1]
341
            groups = self.HUNK_LINE_RE.findall(hunk_info)
342
343
            if len(groups) == 1:
344
345
                try:
346
                    return int(groups[0])
347
348
                except ValueError:
349
                    msg = ("Could not parse '{0}' as a line "
350
                           "number".format(groups[0]))
351
                    raise Exception(msg)
352
353
            else:
354
                msg = "Could not find start of hunk in line '{0}'".format(line)
355
                raise Exception(msg)
356
357
        else:
358
            msg = "Could not parse hunk in line '{0}'".format(line)
359
            raise Exception(msg)
360
361
    def _diff_committed(self, branch='origin/master'):
362
        """Return changes for committed files."""
363
        result = self._git_run_helper(
364
            branch=branch, files_only=False, mode=COMMITED_MODE)
365
        return result
366
367
    def _diff_staged(self):
368
        """Return diff for staged changes."""
369
        result = self._git_run_helper(files_only=False, mode=STAGED_MODE)
370
        return result
371
372
    def _diff_unstaged(self):
373
        """Return diff for unstaged changes."""
374
        result = self._git_run_helper(files_only=False, mode=UNSTAGED_MODE)
375
        return result
376
377
    # --- Public API
378
    # -------------------------------------------------------------------------
379
    def is_repo(self):
380
        """Return if it is a git repo."""
381
        if self._is_repo is None:
382
            args = ['git', 'rev-parse']
383
            output, error = run_command(args, cwd=self.path)
384
            if error:
385
                print(error)
386
                return False
387
            else:
388
                self._is_repo = (not bool(error) and not bool(output))
389
        return self._is_repo
390
391
    @property
392
    def top_level(self):
393
        """Return the top level for the git repo."""
394
        if self._top_level is None:
395
            output, error = run_command(
396
                ['git', 'rev-parse', '--show-toplevel', '--encoding=utf-8'],
397
                cwd=self.path, )
398
            if error:
399
                print(error)
400
                return None
401
            else:
402
                self._top_level = output.split('\n')[0]
403
        return self._top_level
404
405
    def commited_files(self, branch=DEFAULT_BRANCH):
406
        """Return list of commited files."""
407
        result = self._git_run_helper(
408
            branch=branch, files_only=True, mode=COMMITED_MODE)
409
        return result
410
411
    def staged_files(self):
412
        """Return list of staged files."""
413
        result = self._git_run_helper(files_only=True, mode=STAGED_MODE)
414
        return result
415
416
    def unstaged_files(self):
417
        """Return list of unstaged files."""
418
        result = self._git_run_helper(files_only=True, mode=UNSTAGED_MODE)
419
        return result
420
421
    def commited_file_lines(self, branch=DEFAULT_BRANCH):
422
        """Return commited files and lines modified."""
423
        result = self._parse_diff_str(self._diff_committed(branch=branch))
424
        return result
425
426
    def staged_file_lines(self):
427
        """Return unstaged files and lines modified."""
428
        result = self._parse_diff_str(self._diff_staged())
429
        return result
430
431
    def unstaged_file_lines(self):
432
        """Return staged files and lines modified."""
433
        result = self._parse_diff_str(self._diff_unstaged())
434
        return result
435
436
437
class NoDiffTool(DiffToolBase):
438
    """Thin wrapper for a folder not under version control."""
439
440
    def __init__(self, path):
441
        """Thin wrapper for a folder not under version control."""
442
        self.path = path
443
444
    def _get_files_helper(self, lines=False):
445
        paths = get_files(paths=[self.path])
446
        if lines:
447
            paths_dic = {}
448
            for path in paths:
449
                paths_dic[path] = (
450
                    [-1],
451
                    range(100000), )
452
            results = paths_dic
453
        else:
454
            results = paths
455
        return results
456
457
    # --- Public API
458
    # -------------------------------------------------------------------------
459
    @property
460
    def top_level(self):
461
        """Return the top level for the repo."""
462
        return self.path
463
464
    def is_repo(self):
465
        """Return always True as this handles folders not under VC."""
466
        return True
467
468
    def commited_files(self, branch=DEFAULT_BRANCH):
469
        """Return list of commited files."""
470
        return self._get_files_helper()
471
472
    def staged_files(self):
473
        """Return list of staged files."""
474
        return self._get_files_helper()
475
476
    def unstaged_files(self):
477
        """Return list of unstaged files."""
478
        return self._get_files_helper()
479
480
    def commited_file_lines(self, branch=DEFAULT_BRANCH):
481
        """Return commited files and lines modified."""
482
        return self._get_files_helper(lines=True)
483
484
    def staged_file_lines(self):
485
        """Return unstaged files and lines modified."""
486
        return self._get_files_helper(lines=True)
487
488
    def unstaged_file_lines(self):
489
        """Return staged files and lines modified."""
490
        return self._get_files_helper(lines=True)
491
492
493
class DiffTool(object):
494
    """Generic diff tool for handling mercurial, git and no vcs folders."""
495
496
    TOOLS = [
497
        GitDiffTool,
498
        HgDiffTool,
499
        NoDiffTool,
500
    ]
501
502
    def __init__(self, paths):
503
        """Generic diff tool for handling mercurial, git and no vcs folders."""
504
        self.paths = paths
505
        self.diff_tools = {}
506
507
        for path in self.paths:
508
            for diff_tool in self.TOOLS:
509
                tool = diff_tool(path)
510
                if tool.is_repo():
511
                    if tool.top_level not in self.diff_tools:
512
                        self.diff_tools[tool.top_level] = tool
513
                    break
514
515
    # --- Public API
516
    # -------------------------------------------------------------------------
517
    def commited_files(self, branch=DEFAULT_BRANCH):
518
        """Return list of commited files."""
519
        results = []
520
        for diff_tool in self.diff_tools.values():
521
            results += diff_tool.commited_files(branch=branch)
522
        return list(sorted(results))
523
524
    def staged_files(self):
525
        """Return list of staged files."""
526
        results = []
527
        for diff_tool in self.diff_tools.values():
528
            results += diff_tool.staged_files()
529
        return list(sorted(results))
530
531
    def unstaged_files(self):
532
        """Return list of unstaged files."""
533
        results = []
534
        for diff_tool in self.diff_tools.values():
535
            results += diff_tool.unstaged_files()
536
        return list(sorted(results))
537
538
    def commited_file_lines(self, branch=DEFAULT_BRANCH):
539
        """Return commited files and lines modified."""
540
        results = {}
541
        for diff_tool in self.diff_tools.values():
542
            results.update(diff_tool.commited_file_lines(branch=branch))
543
        return make_sorted_dict(results)
544
545
    def staged_file_lines(self):
546
        """Return unstaged files and lines modified."""
547
        results = {}
548
        for diff_tool in self.diff_tools.values():
549
            results.update(diff_tool.staged_file_lines())
550
        return make_sorted_dict(results)
551
552
    def unstaged_file_lines(self):
553
        """Return staged files and lines modified."""
554
        results = {}
555
        for diff_tool in self.diff_tools.values():
556
            results.update(diff_tool.unstaged_file_lines())
557
        return make_sorted_dict(results)
558
559
560
def test():
561
    """Local main test."""
562
    paths = [os.path.dirname(os.path.realpath(__file__))]
563
    diff_tool = DiffTool(paths)
564
    print(diff_tool.unstaged_file_lines())
565
566
567
if __name__ == '__main__':
568
    test()
569