Passed
Push — master ( ce647f...f4ead8 )
by Dean
02:57
created

BaseTest._add_python_home_candidates()   A

Complexity

Conditions 1

Size

Total Lines 8

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 1.4218

Importance

Changes 0
Metric Value
c 0
b 0
f 0
dl 0
loc 8
ccs 1
cts 4
cp 0.25
rs 9.4285
cc 1
crap 1.4218
1 1
from plugin.core.helpers.variable import merge
2
3 1
from subprocess import Popen
4 1
import json
5 1
import logging
6 1
import os
7 1
import subprocess
8 1
import sys
9
10 1
CURRENT_PATH = os.path.abspath(__file__)
11 1
HOST_PATH = os.path.join(os.path.dirname(CURRENT_PATH), 'host.py')
12
13 1
log = logging.getLogger(__name__)
14
15
16 1
class BaseTest(object):
17 1
    name = None
18 1
    optional = False
19
20 1
    @classmethod
21
    def run(cls, search_paths):
22 1
        metadata = {}
23
24 1
        message = None
25 1
        success = None
26
27
        # Retrieve names of test functions
28 1
        names = [
29
            name for name in dir(cls)
30
            if name.startswith('test_')
31
        ]
32
33 1
        if not names:
34
            return cls.build_failure('No tests defined')
35
36
        # Run tests
37 1
        for name in names:
38
            # Ensure function exists
39 1
            if not hasattr(cls, name):
40
                return cls.build_failure('Unable to find function: %r' % name)
41
42
            # Run test
43 1
            try:
44 1
                result = cls.spawn(name, search_paths)
45
46
                # Merge test result into `metadata`
47 1
                merge(metadata, result, recursive=True)
48
49
                # Test successful
50 1
                message = None
51 1
                success = True
52
            except Exception, ex:
53
                if success:
54
                    continue
55
56
                message = ex.message
57
                success = False
58
59 1
        if not success:
60
            # Trigger event
61
            cls.on_failure(message)
62
63
            # Build result
64
            return cls.build_failure(message)
65
66
        # Trigger event
67 1
        cls.on_success(metadata)
68
69
        # Build result
70 1
        return cls.build_success(metadata)
71
72 1
    @classmethod
73
    def spawn(cls, name, search_paths):
74
        # Find path to python executable
75 1
        python_exe = cls.find_python_executable()
76
77 1
        if not python_exe:
78
            raise Exception('Unable to find python executable')
79
80
        # Ensure test host exists
81 1
        if not os.path.exists(HOST_PATH):
82
            raise Exception('Unable to find "host.py" script')
83
84
        # Build test process arguments
85 1
        args = [
86
            python_exe, HOST_PATH,
87
            '--module', cls.__module__,
88
            '--name', name,
89
90
            '--search-paths="%s"' % (
91
                ';'.join(search_paths)
92
            ),
93
        ]
94
95
        # Spawn test (in sub-process)
96 1
        log.debug('Starting test: %s:%s', cls.__module__, name)
97
98 1
        process = Popen(
99
            args,
100
            stdout=subprocess.PIPE,
101
            stderr=subprocess.PIPE
102
        )
103
104
        # Wait for test to complete
105 1
        stdout, stderr = process.communicate()
106
107 1
        if stderr:
108
            log.debug('Test returned messages:\n%s', stderr.replace("\r\n", "\n"))
109
110
        # Parse output
111 1
        result = None
112
113 1
        if stdout:
114 1
            try:
115 1
                result = json.loads(stdout)
116
            except Exception, ex:
117
                log.warn('Invalid output returned %r - %s', stdout, ex, exc_info=True)
118
119
        # Build result
120 1
        if process.returncode != 0:
121
            # Test failed
122
            if result and 'message' in result:
123
                raise Exception(result['message'])
124
125
            raise Exception('Unknown error (code: %s)' % process.returncode)
126
127
        # Test successful
128 1
        return result
129
130 1
    @classmethod
131
    def find_python_executable(cls):
132 1
        candidates = [sys.executable]
133
134
        # Add candidates based on the script path in `sys.argv`
135 1
        if sys.argv and len(sys.argv) > 0 and os.path.exists(sys.argv[0]):
136 1
            bootstrap_path = sys.argv[0]
137 1
            resources_pos = bootstrap_path.lower().find('resources')
138
139 1
            if resources_pos > 0:
140
                pms_path = bootstrap_path[:resources_pos]
141
142
                cls._add_python_home_candidates(candidates, pms_path)
143
144
        # Add candidates relative to `PLEX_MEDIA_SERVER_HOME`
145 1
        pms_home = os.environ.get('PLEX_MEDIA_SERVER_HOME')
146
147 1
        if pms_home and os.path.exists(pms_home):
148
            cls._add_python_home_candidates(candidates, pms_home)
149
150
        # Add candidates relative to `PYTHONHOME`
151 1
        python_home = os.environ.get('PYTHONHOME')
152
153 1
        if python_home and os.path.exists(python_home):
154
            candidates.append(os.path.join(python_home, 'bin', 'python'))
155
156
        # Use first candidate that exists
157 1
        for path in candidates:
158 1
            if os.path.exists(path):
159 1
                return path
160
161
        log.warn('Unable to find python executable', extra={'candidates': candidates})
162
        return None
163
164 1
    @staticmethod
165
    def _add_python_home_candidates(candidates, path):
166
        # Windows
167
        candidates.append(os.path.join(path, 'PlexScriptHost.exe'))
168
169
        # *nix
170
        candidates.append(os.path.join(path, 'Resources', 'Plex Script Host'))
171
        candidates.append(os.path.join(path, 'Resources', 'Python', 'bin', 'python'))
172
173
    #
174
    # Events
175
    #
176
177 1
    @classmethod
178
    def on_failure(cls, message):
179
        pass
180
181 1
    @classmethod
182
    def on_success(cls, metadata):
183 1
        pass
184
185
    #
186
    # Helpers
187
    #
188
189 1
    @classmethod
190 1
    def build_exception(cls, message, exc_info=None):
191
        if exc_info is None:
192
            exc_info = sys.exc_info()
193
194
        return cls.build_failure(
195
            message,
196
            exc_info=exc_info
197
        )
198
199 1
    @classmethod
200
    def build_failure(cls, message, **kwargs):
201
        result = {
202
            'success': False,
203
            'message': message
204
        }
205
206
        # Merge extra attributes
207
        merge(result, kwargs)
208
209
        return result
210
211 1
    @staticmethod
212
    def build_success(metadata):
213 1
        return {
214
            'success': True,
215
            'metadata': metadata
216
        }
217