Passed
Push — master ( 24584b...b7c82b )
by Peter
01:35
created

RunningProgram.expect()   B

Complexity

Conditions 4

Size

Total Lines 32

Duplication

Lines 18
Ratio 56.25 %

Importance

Changes 5
Bugs 0 Features 0
Metric Value
c 5
b 0
f 0
dl 18
loc 32
rs 8.5806
cc 4
1
import pexpect
2
import os
3
import time
4
import tempfile
5
6
from .exceptions import *
7
8
import logging
9
logger = logging.getLogger('opensubmitexec')
10
11
12
def kill_longrunning(config):
13
    '''
14
        Terminate everything under the current user account
15
        that has run too long. This is a final safeguard if
16
        the subprocess timeout stuff is not working.
17
        You better have no production servers running also
18
        under the current user account ...
19
    '''
20
    import psutil
21
    ourpid = os.getpid()
22
    username = psutil.Process(ourpid).username
23
    # Check for other processes running under this account
24
    # Take the timeout definition from the config file
25
    timeout = config.getint("Execution", "timeout")
26
    for proc in psutil.process_iter():
27
        if proc.username == username and proc.pid != ourpid:
28
            runtime = time.time() - proc.create_time
29
            logger.debug("This user already runs %u for %u seconds." %
30
                         (proc.pid, runtime))
31
            if runtime > timeout:
32
                logger.debug("Killing %u due to exceeded runtime." % proc.pid)
33
                try:
34
                    proc.kill()
35
                except Exception:
36
                    logger.error("ERROR killing process %d." % proc.pid)
37
38
39
class RunningProgram(pexpect.spawn):
40
    """A running program that you can interact with.
41
42
    This class is a thin wrapper around the functionality
43
    of pexpect (http://pexpect.readthedocs.io/en/stable/overview.html).
44
45
    Attributes:
46
        job (Job):            The original job for this program execution.
47
        name (str):           The name of the binary that is executed.
48
        arguments (tuple):    The command-line arguments being used for execution.
49
    """
50
    job = None
51
    name = None
52
    arguments = None
53
    _logfile = None
54
    _spawn = None
55
56
    def get_output(self):
57
        """Get the program output produced so far.
58
59
        Returns:
60
            str: Program output as text. May be incomplete.
61
        """
62
        # Open temporary file for reading, in text mode
63
        # This makes sure that the file pointer for writing
64
        # is not touched
65
        return '\n'.join(open(self._logfile.name).readlines())
66
67
    def get_exitstatus(self):
68
        """Get the exit status of the program execution.
69
70
        Returns:
71
            int: Exit status as reported by the operating system, or -1 if it is not available.
72
        """
73
        logger.debug("Exit status is {0}".format(self._spawn.exitstatus))
74
        if self._spawn.exitstatus is None:
75
            logger.debug("Translating non-available exit code to -1.")
76
            return -1
77
        else:
78
            return self._spawn.exitstatus
79
80
    def __init__(self, job, name, arguments=[], timeout=30):
81
        self.job = job
82
        self.name = name
83
        self.arguments = arguments
84
85
        # Allow code to load its own libraries
86
        os.environ["LD_LIBRARY_PATH"] = job.working_dir
87
88
        logger.debug("Spawning '{0}' in {1} with the following arguments:{2}".format(
89 View Code Duplication
            name,
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
90
            job.working_dir,
91
            str(arguments)))
92
93
        if name.startswith('./'):
94
            name = name.replace('./', self.job.working_dir)
95
96
        self._logfile = tempfile.NamedTemporaryFile()
97
        logger.debug("Keeping console I/O in " + self._logfile.name)
98
99
        try:
100
            self._spawn = pexpect.spawn(name, arguments,
101
                                        logfile=self._logfile,
102
                                        timeout=timeout,
103
                                        cwd=self.job.working_dir)
104
        except Exception as e:
105
            logger.debug("Spawning failed: " + str(e))
106
            raise NestedException(
107
                instance=self, real_exception=e, output=self.get_output())
108
109
    def expect(self, pattern, timeout=-1):
110
        """Wait for some output of the running program.
111 View Code Duplication
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
112
        Args:
113
            pattern:  The pattern the output should be checked for.
114
            timeout (int):  How many seconds should be waited for the output.
115
116
        The pattern argument may be a string, a compiled regular expression,
117
        or a list of any of those types. Strings will be compiled into regular expressions.
118
119
        Returns:
120
            int: The index into the pattern list. If the pattern was not a list, it returns 0 on a successful match.
121
122
        Raises:
123
            TimeoutException: The output did not match within the given time frame.
124
            TerminationException: The program terminated before producing the output.
125
            NestedException: An internal problem occured while waiting for the output.
126
        """
127
        logger.debug("Expecting output '{0}' from '{1}'".format(
128
            pattern, self.name))
129
        try:
130
            return self._spawn.expect(pattern, timeout)
131
        except pexpect.exceptions.EOF as e:
132
            logger.debug("Raising termination exception.")
133
            raise TerminationException(instance=self, real_exception=e, output=self.get_output())
134
        except pexpect.exceptions.TIMEOUT as e:
135
            logger.debug("Raising timeout exception.")
136
            raise TimeoutException(instance=self, real_exception=e, output=self.get_output())
137
        except Exception as e:
138
            logger.debug("Expecting output failed: " + str(e))
139
            raise NestedException(
140
                instance=self, real_exception=e, output=self.get_output())
141
142
    def sendline(self, text):
143
        """Sends an input line to the running program, including os.linesep.
144
145
        Args:
146
            text (str): The input text to be send. 
147
148
        Raises:
149
            TerminationException: The program terminated before / while / after sending the input.
150
            NestedException: An internal problem occured while waiting for the output.
151
        """
152
        logger.debug("Sending input '{0}' to '{1}'".format(pattern, self.name))
153
        try:
154
            return self._spawn.sendline(text)
155
        except pexpect.exceptions.EOF as e:
156
            logger.debug("Raising termination exception.")
157
            raise TerminationException(instance=self, real_exception=e, output=self.get_output())
158
        except pexpect.exceptions.TIMEOUT as e:
159
            logger.debug("Raising timeout exception.")
160
            raise TimeoutException(instance=self, real_exception=e, output=self.get_output())
161
        except Exception as e:
162
            logger.debug("Sending input failed: " + str(e))
163
            raise NestedException(
164
                instance=self, real_exception=e, output=self.get_output())
165
166
    def expect_end(self):
167
        """Wait for the running program to finish.
168
169
        Returns:
170
            A tuple with the exit code, as reported by the operating system, and the output produced.
171
        """
172
        logger.debug("Waiting for termination of '{0}'".format(self.name))
173
        try:
174
            # Make sure we fetch the last output bytes.
175
            # Recommendation from the pexpect docs.
176
            self._spawn.expect(pexpect.EOF)
177
            self._spawn.wait()
178
            dircontent = str(os.listdir(self.job.working_dir))
179
            logger.debug("Working directory after execution: " + dircontent)
180
            return self.get_exitstatus(), self.get_output()
181
        except pexpect.exceptions.EOF as e:
182
            logger.debug("Raising termination exception.")
183
            raise TerminationException(instance=self, real_exception=e, output=self.get_output())
184
        except pexpect.exceptions.TIMEOUT as e:
185
            logger.debug("Raising timeout exception.")
186
            raise TimeoutException(instance=self, real_exception=e, output=self.get_output())
187
        except Exception as e:
188
            logger.debug("Waiting for expected program end failed.")
189
            raise NestedException(
190
                instance=self, real_exception=e, output=self.get_output())
191
192
    def expect_exit_status(self, exit_status):
193
        """Wait for the running program to finish and expect some exit status.
194
195
        Args:
196
            exit_status (int):  The expected exit status.
197
198
        Raises:
199
            WrongExitStatusException: The produced exit status is not the expected one.
200
        """
201
        self.expect_end()
202
        logger.debug("Checking exit status of '{0}', output so far: {1}".format(
203
            self.name, self.get_output()))
204
        if self._spawn.exitstatus is None:
205
            raise WrongExitStatusException(
206
                instance=self, expected=exit_status, output=self.get_output())
207
208
        if self._spawn.exitstatus is not exit_status:
209
            raise WrongExitStatusException(
210
                instance=self,
211
                expected=exit_status,
212
                got=self._spawn.exitstatus,
213
                output=self.get_output())
214