Passed
Push — master ( 934a5d...99d96b )
by Jace
48s
created

BaseManager._get_process()   F

Complexity

Conditions 11

Size

Total Lines 34

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 132

Importance

Changes 3
Bugs 0 Features 0
Metric Value
cc 11
c 3
b 0
f 0
dl 0
loc 34
ccs 0
cts 0
cp 0
crap 132
rs 3.1764

How to fix   Complexity   

Complexity

Complex classes like BaseManager._get_process() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
"""Classes to manage application state."""
2
3 1
import os
4 1
import abc
5 1
import time
6 1
import glob
7 1
import platform
8 1
import functools
9 1
import subprocess
10 1
import logging
11
12 1
import psutil
13
14
15 1
log = logging.getLogger(__name__)
16
17
# TODO: delete this after implementing `BaseManager`
18
# https://github.com/jacebrowning/mine/issues/8
19
# https://github.com/jacebrowning/mine/issues/9
20
21
22
# TODO: enable coverage when a Linux test is implemented
23
def log_running(func):  # pragma: no cover (manual)
24
    """Decorator for methods that return application status."""
25
    @functools.wraps(func)
26
    def wrapped(self, application):
27
        """Wrapped method to log if an application is running."""
28
        log.debug("Determining if %s is running...", application)
29
        running = func(self, application)
30
        if running is None:
31
            status = "Untracked"
32
        elif running:
33
            status = "Running"
34
        else:
35
            status = "Not running"
36
        log.info("%s: %s", status, application)
37
        return running
38
    return wrapped
39
40
41
# TODO: enable coverage when a Linux test is implemented
42
def log_starting(func):  # pragma: no cover (manual)
43
    """Decorator for methods that start an application."""
44
    @functools.wraps(func)
45
    def wrapped(self, application):
46
        """Wrapped method to log that an application is being started."""
47
        log.info("Starting %s...", application)
48
        result = func(self, application)
49
        log.info("Running: %s", application)
50
        return result
51
    return wrapped
52
53
54
# TODO: enable coverage when a Linux test is implemented
55
def log_stopping(func):  # pragma: no cover (manual)
56
    """Decorator for methods that stop an application."""
57
    @functools.wraps(func)
58
    def wrapped(self, application):
59
        """Wrapped method to log that an application is being stopped."""
60
        log.info("Stopping %s...", application)
61
        result = func(self, application)
62
        log.info("Not running: %s", application)
63
        return result
64
    return wrapped
65
66
67
class BaseManager(metaclass=abc.ABCMeta):  # pragma: no cover (abstract)
68
    """Base application manager."""
69
70
    NAME = FRIENDLY = None
71
72
    IGNORED_APPLICATION_NAMES = []
73
74
    def __str__(self):
75
        return self.FRIENDLY
76
77
    @abc.abstractmethod
78
    def is_running(self, application):
79
        """Determine if an application is currently running."""
80
        raise NotImplementedError
81
82
    @abc.abstractmethod
83
    def start(self, application):
84
        """Start an application on the current computer."""
85
        raise NotImplementedError
86
87
    @abc.abstractmethod
88
    def stop(self, application):
89
        """Stop an application on the current computer."""
90
        raise NotImplementedError
91
92
    @abc.abstractmethod
93
    def launch(self, path):
94
        """Open a file for editing."""
95
        raise NotImplementedError
96
97
    @classmethod
98
    def _get_process(cls, name):
99
        """Get a process whose executable path contains an app name."""
100
        log.debug("Searching for exe path containing '%s'...", name)
101
102
        for process in psutil.process_iter():
103
            try:
104
                command = ' '.join(process.cmdline()).lower()
105
                parts = []
106
                for arg in process.cmdline():
107
                    parts.extend([p.lower() for p in arg.split(os.sep)])
108
            except psutil.AccessDenied:
109
                continue  # the process is likely owned by root
110
111
            if name.lower() not in parts:
112
                continue
113
114
            if process.pid == os.getpid():
115
                log.debug("Skipped current process: %s", command)
116
                continue
117
118
            if process.status() == psutil.STATUS_ZOMBIE:
119
                log.debug("Skipped zombie process: %s", command)
120
                continue
121
122
            log.debug("Found matching process: %s", command)
123
            for ignored in cls.IGNORED_APPLICATION_NAMES:
124
                if ignored.lower() in parts:
125
                    log.debug("But skipped due to ignored name")
126
                    break
127
            else:
128
                return process
129
130
        return None
131
132
133
class LinuxManager(BaseManager):  # pragma: no cover (manual)
134
    """Application manager for Linux."""
135
136
    NAME = 'Linux'
137
    FRIENDLY = NAME
138
139
    def is_running(self, application):
140
        name = application.versions.linux
141
        if not name:
142
            return None
143
        process = self._get_process(name)
144
        return process is not None
145
146
    def start(self, application):
147
        pass
148
149
    def stop(self, application):
150
        name = application.versions.linux
151
        process = self._get_process(name)
152
        if process.is_running():
153
            process.terminate()
154
155
    def launch(self, path):
156
        log.info("Opening %s...", path)
157
        return subprocess.call(['xdg-open', path]) == 0
158
159
160
class MacManager(BaseManager):  # pragma: no cover (manual)
161
    """Application manager for OS X."""
162
163
    NAME = 'Darwin'
164
    FRIENDLY = 'Mac'
165
166
    IGNORED_APPLICATION_NAMES = [
167
        "iTunesHelper.app",
168
    ]
169
170
    @log_running
171
    def is_running(self, application):
172
        name = application.versions.mac
173
        if not name:
174
            return None
175
        process = self._get_process(name)
176
        return process is not None
177
178
    @log_starting
179
    def start(self, application):
180
        name = application.versions.mac
181
        path = None
182
        for base in (".",
183
                     "~/Applications",
184
                     "/Applications",
185
                     "/Applications/*"):
186
            pattern = os.path.expanduser(os.path.join(base, name))
187
            log.debug("Glob pattern: %s", pattern)
188
            paths = glob.glob(pattern)
189
            if paths:
190
                path = paths[0]
191
                log.debug("Match: %s", path)
192
                break
193
        else:
194
            assert path, "Not found: {}".format(application)
195
        return self._start_app(path)
196
197
    @log_stopping
198
    def stop(self, application):
199
        name = application.versions.mac
200
        process = self._get_process(name)
201
        if process and process.is_running():
202
            process.terminate()
203
204
    @staticmethod
205
    def _start_app(path):
206
        """Start an application from it's .app directory."""
207
        assert os.path.exists(path), path
208
        process = psutil.Popen(['open', path])
209
        time.sleep(1)
210
        return process
211
212
    def launch(self, path):
213
        log.info("opening %s...", path)
214
        return subprocess.call(['open', path]) == 0
215
216
217
class WindowsManager(BaseManager):  # pragma: no cover (manual)
218
    """Application manager for Windows."""
219
220
    NAME = 'Windows'
221
    FRIENDLY = NAME
222
223 1
    def is_running(self, application):
224
        pass
225 1
226 1
    def start(self, application):
227 1
        pass
228
229
    def stop(self, application):
230
        pass
231
232 1
    def launch(self, path):
233 1
        log.info("starting %s...", path)
234
        os.startfile(path)  # pylint: disable=no-member
235
        return True
236
237
238
def get_manager(name=None):
239
    """Return an application manager for the current operating system."""
240
    log.info("Detecting the current system...")
241
    name = name or platform.system()
242
    manager = {
243
        WindowsManager.NAME: WindowsManager,
244
        MacManager.NAME: MacManager,
245
        LinuxManager.NAME: LinuxManager,
246
    }[name]()
247
    log.info("Current system: %s", manager)
248
    return manager
249