Completed
Push — master ( 074cc7...987304 )
by Gonzalo
59s
created

GitDiffTool   B

Complexity

Total Complexity 51

Size/Duplication

Total Lines 334
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
dl 0
loc 334
rs 8.3206
c 0
b 0
f 0
wmc 51

18 Methods

Rating   Name   Duplication   Size   Complexity  
D _parse_lines() 0 65 9
B _parse_hunk_line() 0 42 4
A commited_files() 0 5 1
A is_repo() 0 11 3
A staged_files() 0 4 1
D _parse_source_sections() 0 52 9
A _diff_staged() 0 4 1
A unstaged_file_lines() 0 4 1
C _git_run_helper() 0 39 7
A __init__() 0 5 1
A _parse_source_line() 0 20 4
A commited_file_lines() 0 4 1
A top_level() 0 13 3
A _diff_unstaged() 0 4 1
A unstaged_files() 0 4 1
A _diff_committed() 0 5 1
A _parse_diff_str() 0 22 2
A staged_file_lines() 0 4 1

How to fix   Complexity   

Complex Class

Complex classes like GitDiffTool often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
# -----------------------------------------------------------------------------
3
# Copyright (c) 2016 Continuum Analytics, Inc.
4
#
5
# Licensed under the terms of the MIT License
6
# (see LICENSE.txt for details)
7
# -----------------------------------------------------------------------------
8
"""Version control helpers. Find staged, commited, modified files/lines."""
9
10
# Standard library imports
11
import os
12
import re
13
14
# Local imports
15
from ciocheck.config import (COMMITED_MODE, DEFAULT_BRANCH, STAGED_MODE,
16
                             UNSTAGED_MODE)
17
from ciocheck.utils import get_files, make_sorted_dict, run_command
18
19
20
class DiffToolBase(object):
21
    """Base version controll diff tool."""
22
23
    # --- Public API
24
    # -------------------------------------------------------------------------
25
    @property
26
    def top_level(self):
27
        """Return the top level for the repo."""
28
        raise NotImplementedError
29
30
    def is_repo(self):
31
        """Return if it is a repo of the type."""
32
        raise NotImplementedError
33
34
    def commited_files(self, branch=DEFAULT_BRANCH):
35
        """Return list of commited files."""
36
        raise NotImplementedError
37
38
    def staged_files(self):
39
        """Return list of staged files."""
40
        raise NotImplementedError
41
42
    def unstaged_files(self):
43
        """Return list of unstaged files."""
44
        raise NotImplementedError
45
46
    def commited_file_lines(self, branch=DEFAULT_BRANCH):
47
        """Return commited files and lines modified."""
48
        raise NotImplementedError
49
50
    def staged_file_lines(self):
51
        """Return unstaged files and lines modified."""
52
        raise NotImplementedError
53
54
    def unstaged_file_lines(self):
55
        """Return staged files and lines modified."""
56
        raise NotImplementedError
57
58
59
class HgDiffTool(DiffToolBase):
60
    """Mercurial diff tool."""
61
62
    def __init__(self, path):
63
        """Mercurial diff tool."""
64
        self.path = path
65
        self._top_level = None
66
67
    @property
68
    def top_level(self):
69
        """Return the top level for the repo."""
70
        return ''
71
72
    def is_repo(self):
73
        """Return if it is a repo of the type."""
74
        return False
75
76
    def commited_files(self, branch=DEFAULT_BRANCH):
77
        """Return list of commited files."""
78
        return []
79
80
    def staged_files(self):
81
        """Return list of staged files."""
82
        return []
83
84
    def unstaged_files(self):
85
        """Return list of unstaged files."""
86
        return []
87
88
    def commited_file_lines(self, branch=DEFAULT_BRANCH):
89
        """Return commited files and lines modified."""
90
        return {}
91
92
    def staged_file_lines(self):
93
        """Return unstaged files and lines modified."""
94
        return {}
95
96
    def unstaged_file_lines(self):
97
        """Return staged files and lines modified."""
98
        return {}
99
100
101
class GitDiffTool(DiffToolBase):
102
    """Thin wrapper for a subset of the `git diff` command."""
103
104
    # Regular expressions used to parse the diff output
105
    SRC_FILE_RE = re.compile(r'^diff --git "?a/.*"? "?b/([^ \n"]*)"?')
106
    MERGE_CONFLICT_RE = re.compile(r'^diff --cc ([^ \n]*)')
107
    HUNK_LINE_RE = re.compile(r'\+([0-9]*)')
108
109
    def __init__(self, path):
110
        """Thin wrapper for a subset of the `git diff` command."""
111
        self.path = path
112
        self._top_level = None
113
        self._is_repo = None
114
115
    def _git_run_helper(self,
116
                        branch=DEFAULT_BRANCH,
117
                        files_only=False,
118
                        mode=None):
119
        """Build git diff command to generate different types of diffs."""
120
        command = [
121
            'git',
122
            '-c',
123
            'diff.mnemonicprefix=no',
124
            'diff',
125
        ]
126
127
        if mode == COMMITED_MODE:
128
            command.append("{branch}...HEAD".format(branch=branch))
129
        elif mode == STAGED_MODE:
130
            command.append('--cached')
131
132
        command += [
133
            '--no-color',
134
            '--no-ext-diff',
135
            '--diff-filter=AM',  # Means "added" and "modified"
136
        ]
137
138
        if files_only:
139
            command += [
140
                '--name-only',
141
                '-z',  # Means nul-separated names
142
            ]
143
144
            output, error = run_command(command, cwd=self.path)
145
            print(error)
146
            result = set(output.split('\x00'))
147
            result.discard('')  # There's an empty line in git output
148
            result = [os.path.join(self.top_level, i) for i in sorted(result)]
149
            result = [i for i in result if i.startswith(self.path)]
150
        else:
151
            result, error = run_command(command, cwd=self.path)
152
153
        return result
154
155
    def _parse_diff_str(self, diff_str):
156
        """
157
        Parse the output of `git diff` into a dictionary.
158
159
        Dictionary in the form:
160
            { SRC_PATH: (ADDED_LINES, DELETED_LINES) }
161
        where `ADDED_LINES` and `DELETED_LINES` are lists of line numbers
162
        added/deleted respectively.
163
        """
164
        # Create a dict to hold results
165
        diff_dict = dict()
166
167
        # Parse the diff string into sections by source file
168
        sections_dict = self._parse_source_sections(diff_str)
169
        for (src_path, diff_lines) in sections_dict.items():
170
            full_src_path = os.path.join(self.top_level, src_path)
171
            # Parse the hunk information for the source file
172
            # to determine lines changed for the source file
173
            diff_dict[full_src_path] = self._parse_lines(diff_lines)
174
175
        ordered_diff_dict = make_sorted_dict(diff_dict)
176
        return ordered_diff_dict
177
178
    def _parse_source_sections(self, diff_str):
179
        """Parse source sections from git diff."""
180
        # Create a dict to map source files to lines in the diff output
181
        source_dict = dict()
182
183
        # Keep track of the current source file
184
        src_path = None
185
186
        # Signal that we've found a hunk (after starting a source file)
187
        found_hunk = False
188
189
        # Parse the diff string into sections by source file
190
        for line in diff_str.split('\n'):
191
192
            # If the line starts with "diff --git"
193
            # or "diff --cc" (in the case of a merge conflict)
194
            # then it is the start of a new source file
195
            if line.startswith('diff --git') or line.startswith('diff --cc'):
196
197
                # Retrieve the name of the source file
198
                src_path = self._parse_source_line(line)
199
200
                # Create an entry for the source file, if we don't
201
                # already have one.
202
                if src_path not in source_dict:
203
                    source_dict[src_path] = []
204
205
                # Signal that we're waiting for a hunk for this source file
206
                found_hunk = False
207
208
            # Every other line is stored in the dictionary for this source file
209
            # once we find a hunk section
210
            else:
211
212
                # Only add lines if we're in a hunk section
213
                # (ignore index and files changed lines)
214
                if found_hunk or line.startswith('@@'):
215
216
                    # Remember that we found a hunk
217
                    found_hunk = True
218
219
                    if src_path is not None:
220
                        source_dict[src_path].append(line)
221
222
                    else:
223
                        # We tolerate other information before we have
224
                        # a source file defined, unless it's a hunk line
225
                        if line.startswith("@@"):
226
                            msg = "Hunk has no source file: '{0}'".format(line)
227
                            raise Exception(msg)
228
229
        return source_dict
230
231
    def _parse_source_line(self, line):
232
        """Return path to source given a source line in `git diff`."""
233
        if '--git' in line:
234
            regex = self.SRC_FILE_RE
235
        elif '--cc' in line:
236
            regex = self.MERGE_CONFLICT_RE
237
        else:
238
            msg = ("Do not recognize format of source in line "
239
                   "'{0}'".format(line))
240
            raise Exception(msg)
241
242
        # Parse for the source file path
243
        groups = regex.findall(line)
244
245
        if len(groups) == 1:
246
            return groups[0]
247
248
        else:
249
            msg = "Could not parse source path in line '{0}'".format(line)
250
            raise Exception(msg)
251
252
    def _parse_lines(self, diff_lines):
253
        """
254
        Return  `(ADDED_LINES, DELETED_LINES)` for a source file in diff.
255
256
        `ADDED_LINES` and `DELETED_LINES` are lists of line numbers
257
        added/deleted respectively.
258
        """
259
        added_lines = []
260
        deleted_lines = []
261
262
        current_line_new = None
263
        current_line_old = None
264
265
        for line in diff_lines:
266
267
            # If this is the start of the hunk definition, retrieve
268
            # the starting line number
269
            if line.startswith('@@'):
270
                line_num = self._parse_hunk_line(line)
271
                current_line_new, current_line_old = line_num, line_num
272
273
            # This is an added/modified line, so store the line number
274
            elif line.startswith('+'):
275
276
                # Since we parse for source file sections before
277
                # calling this method, we're guaranteed to have a source
278
                # file specified.  We check anyway just to be safe.
279
                if current_line_new is not None:
280
281
                    # Store the added line
282
                    added_lines.append(current_line_new)
283
284
                    # Increment the line number in the file
285
                    current_line_new += 1
286
287
            # This is a deleted line that does not exist in the final
288
            # version, so skip it
289
            elif line.startswith('-'):
290
291
                # Since we parse for source file sections before
292
                # calling this method, we're guaranteed to have a source
293
                # file specified.  We check anyway just to be safe.
294
                if current_line_old is not None:
295
296
                    # Store the deleted line
297
                    deleted_lines.append(current_line_old)
298
299
                    # Increment the line number in the file
300
                    current_line_old += 1
301
302
            # This is a line in the final version that was not modified.
303
            # Increment the line number, but do not store this as a changed
304
            # line.
305
            else:
306
                if current_line_old is not None:
307
                    current_line_old += 1
308
309
                if current_line_new is not None:
310
                    current_line_new += 1
311
312
                # If we are not in a hunk, then ignore the line
313
                else:
314
                    pass
315
316
        return added_lines, deleted_lines
317
318
    def _parse_hunk_line(self, line):
319
        """
320
        Return the line number at the start of a hunk in a given line.
321
322
        A hunk is a segment of code that contains changes.
323
324
        The format of the hunk line is:
325
            @@ -k,l +n,m @@ TEXT
326
        where `k,l` represent the start line and length before the changes
327
        and `n,m` represent the start line and length after the changes.
328
        `git diff` will sometimes put a code excerpt from within the hunk
329
        in the `TEXT` section of the line.
330
        """
331
        # Split the line at the @@ terminators (start and end of the line)
332
        components = line.split('@@')
333
334
        # The first component should be an empty string, because
335
        # the line starts with '@@'.  The second component should
336
        # be the hunk information, and any additional components
337
        # are excerpts from the code.
338
        if len(components) >= 2:
339
340
            hunk_info = components[1]
341
            groups = self.HUNK_LINE_RE.findall(hunk_info)
342
343
            if len(groups) == 1:
344
345
                try:
346
                    return int(groups[0])
347
348
                except ValueError:
349
                    msg = ("Could not parse '{0}' as a line "
350
                           "number".format(groups[0]))
351
                    raise Exception(msg)
352
353
            else:
354
                msg = "Could not find start of hunk in line '{0}'".format(line)
355
                raise Exception(msg)
356
357
        else:
358
            msg = "Could not parse hunk in line '{0}'".format(line)
359
            raise Exception(msg)
360
361
    def _diff_committed(self, branch='origin/master'):
362
        """Return changes for committed files."""
363
        result = self._git_run_helper(
364
            branch=branch, files_only=False, mode=COMMITED_MODE)
365
        return result
366
367
    def _diff_staged(self):
368
        """Return diff for staged changes."""
369
        result = self._git_run_helper(files_only=False, mode=STAGED_MODE)
370
        return result
371
372
    def _diff_unstaged(self):
373
        """Return diff for unstaged changes."""
374
        result = self._git_run_helper(files_only=False, mode=UNSTAGED_MODE)
375
        return result
376
377
    # --- Public API
378
    # -------------------------------------------------------------------------
379
    def is_repo(self):
380
        """Return if it is a git repo."""
381
        if self._is_repo is None:
382
            args = ['git', 'rev-parse']
383
            output, error = run_command(args, cwd=self.path)
384
            if error:
385
                print(error)
386
                return False
387
            else:
388
                self._is_repo = (not bool(error) and not bool(output))
389
        return self._is_repo
390
391
    @property
392
    def top_level(self):
393
        """Return the top level for the git repo."""
394
        if self._top_level is None:
395
            output, error = run_command(
396
                ['git', 'rev-parse', '--show-toplevel', '--encoding=utf-8'],
397
                cwd=self.path, )
398
            if error:
399
                print(error)
400
                return None
401
            else:
402
                self._top_level = output.split('\n')[0]
403
        return self._top_level
404
405
    def commited_files(self, branch=DEFAULT_BRANCH):
406
        """Return list of commited files."""
407
        result = self._git_run_helper(
408
            branch=branch, files_only=True, mode=COMMITED_MODE)
409
        return result
410
411
    def staged_files(self):
412
        """Return list of staged files."""
413
        result = self._git_run_helper(files_only=True, mode=STAGED_MODE)
414
        return result
415
416
    def unstaged_files(self):
417
        """Return list of unstaged files."""
418
        result = self._git_run_helper(files_only=True, mode=UNSTAGED_MODE)
419
        return result
420
421
    def commited_file_lines(self, branch=DEFAULT_BRANCH):
422
        """Return commited files and lines modified."""
423
        result = self._parse_diff_str(self._diff_committed(branch=branch))
424
        return result
425
426
    def staged_file_lines(self):
427
        """Return unstaged files and lines modified."""
428
        result = self._parse_diff_str(self._diff_staged())
429
        return result
430
431
    def unstaged_file_lines(self):
432
        """Return staged files and lines modified."""
433
        result = self._parse_diff_str(self._diff_unstaged())
434
        return result
435
436
437
class NoDiffTool(DiffToolBase):
438
    """Thin wrapper for a folder not under version control."""
439
440
    def __init__(self, path):
441
        """Thin wrapper for a folder not under version control."""
442
        self.path = path
443
444
    def _get_files_helper(self, lines=False):
445
        paths = get_files(paths=[self.path])
446
        if lines:
447
            paths_dic = {}
448
            for path in paths:
449
                paths_dic[path] = (
450
                    [-1],
451
                    range(100000), )
452
            results = paths_dic
453
        else:
454
            results = paths
455
        return results
456
457
    # --- Public API
458
    # -------------------------------------------------------------------------
459
    @property
460
    def top_level(self):
461
        """Return the top level for the repo."""
462
        return self.path
463
464
    def is_repo(self):
465
        """Return always True as this handles folders not under VC."""
466
        return True
467
468
    def commited_files(self, branch=DEFAULT_BRANCH):
469
        """Return list of commited files."""
470
        return self._get_files_helper()
471
472
    def staged_files(self):
473
        """Return list of staged files."""
474
        return self._get_files_helper()
475
476
    def unstaged_files(self):
477
        """Return list of unstaged files."""
478
        return self._get_files_helper()
479
480
    def commited_file_lines(self, branch=DEFAULT_BRANCH):
481
        """Return commited files and lines modified."""
482
        return self._get_files_helper(lines=True)
483
484
    def staged_file_lines(self):
485
        """Return unstaged files and lines modified."""
486
        return self._get_files_helper(lines=True)
487
488
    def unstaged_file_lines(self):
489
        """Return staged files and lines modified."""
490
        return self._get_files_helper(lines=True)
491
492
493
class DiffTool(object):
494
    """Generic diff tool for handling mercurial, git and no vcs folders."""
495
496
    TOOLS = [
497
        GitDiffTool,
498
        HgDiffTool,
499
        NoDiffTool,
500
    ]
501
502
    def __init__(self, paths):
503
        """Generic diff tool for handling mercurial, git and no vcs folders."""
504
        self.paths = paths
505
        self.diff_tools = {}
506
507
        for path in self.paths:
508
            for diff_tool in self.TOOLS:
509
                tool = diff_tool(path)
510
                if tool.is_repo():
511
                    if tool.top_level not in self.diff_tools:
512
                        self.diff_tools[tool.top_level] = tool
513
                    break
514
515
    # --- Public API
516
    # -------------------------------------------------------------------------
517
    def commited_files(self, branch=DEFAULT_BRANCH):
518
        """Return list of commited files."""
519
        results = []
520
        for diff_tool in self.diff_tools.values():
521
            results += diff_tool.commited_files(branch=branch)
522
        return list(sorted(results))
523
524
    def staged_files(self):
525
        """Return list of staged files."""
526
        results = []
527
        for diff_tool in self.diff_tools.values():
528
            results += diff_tool.staged_files()
529
        return list(sorted(results))
530
531
    def unstaged_files(self):
532
        """Return list of unstaged files."""
533
        results = []
534
        for diff_tool in self.diff_tools.values():
535
            results += diff_tool.unstaged_files()
536
        return list(sorted(results))
537
538
    def commited_file_lines(self, branch=DEFAULT_BRANCH):
539
        """Return commited files and lines modified."""
540
        results = {}
541
        for diff_tool in self.diff_tools.values():
542
            results.update(diff_tool.commited_file_lines(branch=branch))
543
        return make_sorted_dict(results)
544
545
    def staged_file_lines(self):
546
        """Return unstaged files and lines modified."""
547
        results = {}
548
        for diff_tool in self.diff_tools.values():
549
            results.update(diff_tool.staged_file_lines())
550
        return make_sorted_dict(results)
551
552
    def unstaged_file_lines(self):
553
        """Return staged files and lines modified."""
554
        results = {}
555
        for diff_tool in self.diff_tools.values():
556
            results.update(diff_tool.unstaged_file_lines())
557
        return make_sorted_dict(results)
558
559
560
def test():
561
    """Local main test."""
562
    paths = [os.path.dirname(os.path.realpath(__file__))]
563
    diff_tool = DiffTool(paths)
564
    print(diff_tool.unstaged_file_lines())
565
566
567
if __name__ == '__main__':
568
    test()
569