Passed
Push — master ( 0c744a...56a557 )
by Peter
05:43
created

RunningProgram.expect()   A

Complexity

Conditions 1

Size

Total Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 6
Bugs 0 Features 0
Metric Value
c 6
b 0
f 0
dl 0
loc 5
rs 9.4285
cc 1
1
import pexpect
2
import os
3
import time
4
import tempfile
5
6
from .exceptions import *
7
8
import logging
9
logger = logging.getLogger('opensubmitexec')
10
11
12
def kill_longrunning(config):
13
    '''
14
        Terminate everything under the current user account
15
        that has run too long. This is a final safeguard if
16
        the subprocess timeout stuff is not working.
17
        You better have no production servers running also
18
        under the current user account ...
19
    '''
20
    import psutil
21
    ourpid = os.getpid()
22
    username = psutil.Process(ourpid).username
23
    # Check for other processes running under this account
24
    # Take the timeout definition from the config file
25
    timeout = config.getint("Execution", "timeout")
26
    for proc in psutil.process_iter():
27
        if proc.username == username and proc.pid != ourpid:
28
            runtime = time.time() - proc.create_time
29
            logger.debug("This user already runs %u for %u seconds." %
30
                         (proc.pid, runtime))
31
            if runtime > timeout:
32
                logger.debug("Killing %u due to exceeded runtime." % proc.pid)
33
                try:
34
                    proc.kill()
35
                except Exception:
36
                    logger.error("ERROR killing process %d." % proc.pid)
37
38
39
class RunningProgram(pexpect.spawn):
40
    """A running program that you can interact with.
41
42
    This class is a thin wrapper around the functionality
43
    of pexpect (http://pexpect.readthedocs.io/en/stable/overview.html).
44
45
    Attributes:
46
        job (Job):            The original job for this program execution.
47
        name (str):           The name of the binary that is executed.
48
        arguments (tuple):    The command-line arguments being used for execution.
49
    """
50
    job = None
51
    name = None
52
    arguments = None
53
    _logfile = None
54
    _spawn = None
55
56
    def get_output(self):
57
        """Get the program output produced so far.
58
59
        Returns:
60
            str: Program output as text. May be incomplete.
61
        """
62
        # Open temporary file for reading, in text mode
63
        # This makes sure that the file pointer for writing
64
        # is not touched
65
        return ''.join(open(self._logfile.name).readlines())
66
67
    def get_exitstatus(self):
68
        """Get the exit status of the program execution.
69
70
        Returns:
71
            int: Exit status as reported by the operating system,
72
                 or None if it is not available.
73
        """
74
        logger.debug("Exit status is {0}".format(self._spawn.exitstatus))
75
        return self._spawn.exitstatus
76
77
    def __init__(self, job, name, arguments=[], timeout=30):
78
        self.job = job
79
        self.name = name
80
        self.arguments = arguments
81
82
        # Allow code to load its own libraries
83
        os.environ["LD_LIBRARY_PATH"] = job.working_dir
84
85
        logger.debug("Spawning '{0}' in {1} with the following arguments:{2}".format(
86
            name,
87
            job.working_dir,
88
            str(arguments)))
89 View Code Duplication
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
90
        if name.startswith('./'):
91
            name = name.replace('./', self.job.working_dir)
92
93
        self._logfile = tempfile.NamedTemporaryFile()
94
        logger.debug("Keeping console I/O in " + self._logfile.name)
95
96
        try:
97
            self._spawn = pexpect.spawn(name, arguments,
98
                                        logfile=self._logfile,
99
                                        timeout=timeout,
100
                                        cwd=self.job.working_dir)
101
        except Exception as e:
102
            logger.debug("Spawning failed: " + str(e))
103
            raise NestedException(
104
                instance=self, real_exception=e, output=self.get_output())
105
106
    def expect_output(self, pattern, timeout=-1):
107
        """Wait until the running program performs some given output, or terminates.
108
109
        Args:
110
            pattern:  The pattern the output should be checked for.
111 View Code Duplication
            timeout (int):  How many seconds should be waited for the output.
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
112
113
        The pattern argument may be a string, a compiled regular expression,
114
        or a list of any of those types. Strings will be compiled into regular expressions.
115
116
        Returns:
117
            int: The index into the pattern list. If the pattern was not a list, it returns 0 on a successful match.
118
119
        Raises:
120
            TimeoutException: The output did not match within the given time frame.
121
            TerminationException: The program terminated before producing the output.
122
            NestedException: An internal problem occured while waiting for the output.
123
        """
124
        logger.debug("Expecting output '{0}' from '{1}'".format(
125
            pattern, self.name))
126
        try:
127
            return self._spawn.expect(pattern, timeout)
128
        except pexpect.exceptions.EOF as e:
129
            logger.debug("Raising termination exception.")
130
            raise TerminationException(instance=self, real_exception=e, output=self.get_output())
131
        except pexpect.exceptions.TIMEOUT as e:
132
            logger.debug("Raising timeout exception.")
133
            raise TimeoutException(instance=self, real_exception=e, output=self.get_output())
134
        except Exception as e:
135
            logger.debug("Expecting output failed: " + str(e))
136
            raise NestedException(
137
                instance=self, real_exception=e, output=self.get_output())
138
139
    def sendline(self, text):
140
        """Sends an input line to the running program, including os.linesep.
141
142
        Args:
143
            text (str): The input text to be send. 
144
145
        Raises:
146
            TerminationException: The program terminated before / while / after sending the input.
147
            NestedException: An internal problem occured while waiting for the output.
148
        """
149
        logger.debug("Sending input '{0}' to '{1}'".format(text, self.name))
150
        try:
151
            return self._spawn.sendline(text)
152
        except pexpect.exceptions.EOF as e:
153
            logger.debug("Raising termination exception.")
154
            raise TerminationException(instance=self, real_exception=e, output=self.get_output())
155
        except pexpect.exceptions.TIMEOUT as e:
156
            logger.debug("Raising timeout exception.")
157
            raise TimeoutException(instance=self, real_exception=e, output=self.get_output())
158
        except Exception as e:
159
            logger.debug("Sending input failed: " + str(e))
160
            raise NestedException(
161
                instance=self, real_exception=e, output=self.get_output())
162
163
    def expect_end(self):
164
        """Wait for the running program to finish.
165
166
        Returns:
167
            A tuple with the exit code, as reported by the operating system, and the output produced.
168
        """
169
        logger.debug("Waiting for termination of '{0}'".format(self.name))
170
        try:
171
            # Make sure we fetch the last output bytes.
172
            # Recommendation from the pexpect docs.
173
            self._spawn.expect(pexpect.EOF)
174
            self._spawn.wait()
175
            dircontent = str(os.listdir(self.job.working_dir))
176
            logger.debug("Working directory after execution: " + dircontent)
177
            return self.get_exitstatus(), self.get_output()
178
        except pexpect.exceptions.EOF as e:
179
            logger.debug("Raising termination exception.")
180
            raise TerminationException(instance=self, real_exception=e, output=self.get_output())
181
        except pexpect.exceptions.TIMEOUT as e:
182
            logger.debug("Raising timeout exception.")
183
            raise TimeoutException(instance=self, real_exception=e, output=self.get_output())
184
        except Exception as e:
185
            logger.debug("Waiting for expected program end failed.")
186
            raise NestedException(
187
                instance=self, real_exception=e, output=self.get_output())
188
189
    def expect_exitstatus(self, exit_status):
190
        """Wait for the running program to finish and expect some exit status.
191
192
        Args:
193
            exit_status (int):  The expected exit status.
194
195
        Raises:
196
            WrongExitStatusException: The produced exit status is not the expected one.
197
        """
198
        self.expect_end()
199
        logger.debug("Checking exit status of '{0}', output so far: {1}".format(
200
            self.name, self.get_output()))
201
        if self._spawn.exitstatus is None:
202
            raise WrongExitStatusException(
203
                instance=self, expected=exit_status, output=self.get_output())
204
205
        if self._spawn.exitstatus is not exit_status:
206
            raise WrongExitStatusException(
207
                instance=self,
208
                expected=exit_status,
209
                got=self._spawn.exitstatus,
210
                output=self.get_output())
211
212
    def expect_exit_status(self, exit_status):
213
        """
214
        Deprecated. Use expect_exitstatus() instead.
215
        """
216
        return self.expect_exitstatus(exit_status)
217
218
    def expect(self, pattern, timeout=-1):
219
        """
220
        Deprecated. Use expect_output() instead.
221
        """
222
        return self.expect_output(pattern, timeout)
223