Passed
Pull Request — master (#15)
by Amresh
01:45
created

fastest.__main__.make_test_module()   A

Complexity

Conditions 2

Size

Total Lines 6
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 5
dl 0
loc 6
rs 10
c 0
b 0
f 0
cc 2
nop 0
1
import argparse
2
import os
3
import fnmatch
4
import time
5
import subprocess
6
7
from fastest.io.read_file import read_file
8
from fastest.code_assets.function import get_functions
9
from fastest.compiler import compile_tests
10
from watchdog.observers import Observer
11
from watchdog.events import PatternMatchingEventHandler
12
from logger.logger import logger
13
14
15
parser = argparse.ArgumentParser(description='Create test cases automatically')
16
parser.add_argument('--path', help='Project root, use $(pwd) to be sure')
17
parser.add_argument('--source', help='Modules to check coverage for')
18
parser.add_argument('--poll-duration', default='1', help='Modules to check coverage for')
19
parser.add_argument(
20
    '--exclude',
21
    help='Comma separated names of files/dirs that should NOT be watched',
22
)
23
24
args = parser.parse_args()
25
monitor_path = args.path if args.path is not None else os.getcwd()
26
poll_duration = int(args.poll_duration) \
27
    if type(args.poll_duration) is str and \
28
    args.poll_duration.isdigit() \
29
    else 1
30
31
32
def make_test_module():
33
    if os.path.exists('./test'):
34
        return
35
36
    os.mkdir('./test')
37
    open('./test/__init__.py', 'a').close()
38
39
def get_report_path():
40
    return os.path.abspath(os.path.join(monitor_path, 'htmlcov/index.html'))
41
42
43
def get_test_files():
44
    test_files = os.listdir('./test')
45
    return [
46
        test_file.replace('.py', '')
47
        for test_file in test_files
48
        if '.pyc' not in test_file and
49
           '__pycache__' not in test_file
50
    ]
51
52
53
def is_path_to_be_ignored(event_path, ignore_patterns):
54
    for ignored_pattern in ignore_patterns:
55
        _, current_file_path = event_path.split(monitor_path + '/')
56
        if fnmatch.fnmatch(current_file_path, ignored_pattern):
57
            return True
58
    return False
59
60
61
def create_test_command():
62
    test_files = get_test_files()
63
    return ['test.{}'.format(test_file) for test_file in test_files]
64
65
66
def execute_coverage_and_tests(source):
67
    command = create_test_command()
68
    report_path = get_report_path()
69
    source_present_command = ['coverage', 'run', '--source', source, '-m', 'unittest'] + command
70
    source_missing_command = ['coverage', 'run', '-m', 'unittest'] + command
71
    subprocess.call(source_present_command) if source else subprocess.call(source_missing_command)
72
    subprocess.call(['coverage', 'report'])
73
    subprocess.call(['coverage', 'html'])
74
    logger.info('Check coverage: ' + report_path)
75
76
77
def main():
78
    logger.info('Monitoring started...')
79
80
    class PyFileHandler(PatternMatchingEventHandler):
81
        patterns = ['*.py']
82
        exclude_files = args.exclude if args.exclude is not None else ''
83
        ignore_patterns = [path.strip() for path in exclude_files.split(',')]
84
        ignore_patterns += ['test/*', '__pycache__', '*.pyc', '*__test.py']
85
86
        def process(self, event):
87
            if is_path_to_be_ignored(event.src_path, self.ignore_patterns):
88
                return None
89
90
            page = read_file(monitor_path, event.src_path)
91
            functions = get_functions(page)
92
            compile_tests.build(functions, event.src_path, monitor_path)
93
            execute_coverage_and_tests(args.source)
94
95
        def on_modified(self, event):
96
            self.process(event)
97
98
        def on_created(self, event):
99
            self.process(event)
100
101
    observer = Observer()
102
    observer.schedule(PyFileHandler(), monitor_path, recursive=True)
103
    observer.start()
104
105
    try:
106
        while True:
107
            time.sleep(poll_duration)
108
    except KeyboardInterrupt:
109
        observer.stop()
110
    observer.join()
111