Issues (21)

executor/opensubmitexec/running.py (2 issues)

1
import pexpect
2
import os
3
import time
4
import tempfile
5
6
from .exceptions import *
7
8
import logging
9
logger = logging.getLogger('opensubmitexec')
10
11
12
def kill_longrunning(config):
13
    '''
14
        Terminate everything under the current user account
15
        that has run too long. This is a final safeguard if
16
        the subprocess timeout stuff is not working.
17
        You better have no production servers running also
18
        under the current user account ...
19
    '''
20
    import psutil
21
    ourpid = os.getpid()
22
    username = psutil.Process(ourpid).username
23
    # Check for other processes running under this account
24
    # Take the timeout definition from the config file
25
    timeout = config.getint("Execution", "timeout")
26
    for proc in psutil.process_iter():
27
        if proc.username == username and proc.pid != ourpid:
28
            runtime = time.time() - proc.create_time
29
            logger.debug("This user already runs %u for %u seconds." %
30
                         (proc.pid, runtime))
31
            if runtime > timeout:
32
                logger.debug("Killing %u due to exceeded runtime." % proc.pid)
33
                try:
34
                    proc.kill()
35
                except Exception:
36
                    logger.error("ERROR killing process %d." % proc.pid)
37
38
39
class RunningProgram(pexpect.spawn):
40
    """A running program that you can interact with.
41
42
    This class is a thin wrapper around the functionality
43
    of pexpect (http://pexpect.readthedocs.io/en/stable/overview.html).
44
45
    Attributes:
46
        job (Job):            The original job for this program execution.
47
        name (str):           The name of the binary that is executed.
48
        arguments (tuple):    The command-line arguments being used for execution.
49
    """
50
    job = None
51
    name = None
52
    arguments = None
53
    _logfile = None
54
    _spawn = None
55
56
    def get_output(self):
57
        """Get the program output produced so far.
58
59
        Returns:
60
            str: Program output as text. May be incomplete.
61
        """
62
        # Open temporary file for reading, in text mode
63
        # This makes sure that the file pointer for writing
64
        # is not touched
65
        return ''.join(open(self._logfile.name).readlines())
66
67
    def get_exitstatus(self):
68
        """Get the exit status of the program execution.
69
70
        Returns:
71
            int: Exit status as reported by the operating system,
72
                 or None if it is not available.
73
        """
74
        logger.debug("Exit status is {0}".format(self._spawn.exitstatus))
75
        return self._spawn.exitstatus
76
77
    def __init__(self, job, name, arguments=[], timeout=30):
78
        self.job = job
79
        self.name = name
80
        self.arguments = arguments
81
82
        # Allow code to load its own libraries
83
        os.environ["LD_LIBRARY_PATH"] = job.working_dir
84
85
        logger.debug("Spawning '{0}' in {1} with the following arguments:{2}".format(
86
            name,
87
            job.working_dir,
88
            str(arguments)))
89
90
        if name.startswith('./'):
91
            name = name.replace('./', self.job.working_dir)
92
93
        self._logfile = tempfile.NamedTemporaryFile()
94
        logger.debug("Keeping console I/O in " + self._logfile.name)
95
        try:
96
            self._spawn = pexpect.spawn(name, arguments,
97
                                        logfile=self._logfile,
98
                                        timeout=timeout,
99
                                        cwd=self.job.working_dir,
100
                                        echo=False)
101
        except Exception as e:
102
            logger.debug("Spawning failed: " + str(e))
103
            raise NestedException(instance=self, real_exception=e, output=self.get_output())
104
105 View Code Duplication
    def expect_output(self, pattern, timeout=-1):
0 ignored issues
show
This code seems to be duplicated in your project.
Loading history...
106
        """Wait until the running program performs some given output, or terminates.
107
108
        Args:
109
            pattern:  The pattern the output should be checked for.
110
            timeout (int):  How many seconds should be waited for the output.
111
112
        The pattern argument may be a string, a compiled regular expression,
113
        or a list of any of those types. Strings will be compiled into regular expressions.
114
115
        Returns:
116
            int: The index into the pattern list. If the pattern was not a list, it returns 0 on a successful match.
117
118
        Raises:
119
            TimeoutException: The output did not match within the given time frame.
120
            TerminationException: The program terminated before producing the output.
121
            NestedException: An internal problem occured while waiting for the output.
122
        """
123
        logger.debug("Expecting output '{0}' from '{1}'".format(pattern, self.name))
124
        try:
125
            return self._spawn.expect(pattern, timeout)
126
        except pexpect.exceptions.EOF as e:
127
            logger.debug("Raising termination exception.")
128
            raise TerminationException(instance=self, real_exception=e, output=self.get_output())
129
        except pexpect.exceptions.TIMEOUT as e:
130
            logger.debug("Raising timeout exception.")
131
            raise TimeoutException(instance=self, real_exception=e, output=self.get_output())
132
        except Exception as e:
133
            logger.debug("Expecting output failed: " + str(e))
134
            raise NestedException(instance=self, real_exception=e, output=self.get_output())
135
136 View Code Duplication
    def sendline(self, text):
0 ignored issues
show
This code seems to be duplicated in your project.
Loading history...
137
        """Sends an input line to the running program, including os.linesep.
138
139
        Args:
140
            text (str): The input text to be send. 
141
142
        Raises:
143
            TerminationException: The program terminated before / while / after sending the input.
144
            NestedException: An internal problem occured while waiting for the output.
145
        """
146
        logger.debug("Sending input '{0}' to '{1}'".format(text, self.name))
147
        try:
148
            return self._spawn.sendline(text)
149
        except pexpect.exceptions.EOF as e:
150
            logger.debug("Raising termination exception.")
151
            raise TerminationException(instance=self, real_exception=e, output=self.get_output())
152
        except pexpect.exceptions.TIMEOUT as e:
153
            logger.debug("Raising timeout exception.")
154
            raise TimeoutException(instance=self, real_exception=e, output=self.get_output())
155
        except Exception as e:
156
            logger.debug("Sending input failed: " + str(e))
157
            raise NestedException(instance=self, real_exception=e, output=self.get_output())
158
159
    def expect_end(self):
160
        """Wait for the running program to finish.
161
162
        Returns:
163
            A tuple with the exit code, as reported by the operating system, and the output produced.
164
        """
165
        logger.debug("Waiting for termination of '{0}'".format(self.name))
166
        try:
167
            # Make sure we fetch the last output bytes.
168
            # Recommendation from the pexpect docs.
169
            self._spawn.expect(pexpect.EOF)
170
            self._spawn.wait()
171
            dircontent = str(os.listdir(self.job.working_dir))
172
            logger.debug("Working directory after execution: " + dircontent)
173
            return self.get_exitstatus(), self.get_output()
174
        except pexpect.exceptions.EOF as e:
175
            logger.debug("Raising termination exception.")
176
            raise TerminationException(instance=self, real_exception=e, output=self.get_output())
177
        except pexpect.exceptions.TIMEOUT as e:
178
            logger.debug("Raising timeout exception.")
179
            raise TimeoutException(instance=self, real_exception=e, output=self.get_output())
180
        except Exception as e:
181
            logger.debug("Waiting for expected program end failed.")
182
            raise NestedException(instance=self, real_exception=e, output=self.get_output())
183
184
    def expect_exitstatus(self, exit_status):
185
        """Wait for the running program to finish and expect some exit status.
186
187
        Args:
188
            exit_status (int):  The expected exit status.
189
190
        Raises:
191
            WrongExitStatusException: The produced exit status is not the expected one.
192
        """
193
        self.expect_end()
194
        logger.debug("Checking exit status of '{0}', output so far: {1}".format(
195
            self.name, self.get_output()))
196
        if self._spawn.exitstatus is None:
197
            raise WrongExitStatusException(
198
                instance=self, expected=exit_status, output=self.get_output())
199
200
        if self._spawn.exitstatus is not exit_status:
201
            raise WrongExitStatusException(
202
                instance=self,
203
                expected=exit_status,
204
                got=self._spawn.exitstatus,
205
                output=self.get_output())
206
207
    def expect_exit_status(self, exit_status):
208
        """
209
        Deprecated. Use expect_exitstatus() instead.
210
        """
211
        return self.expect_exitstatus(exit_status)
212
213
    def expect(self, pattern, timeout=-1):
214
        """
215
        Deprecated. Use expect_output() instead.
216
        """
217
        return self.expect_output(pattern, timeout)
218