GitHub Access Token became invalid

It seems like the GitHub access token used for retrieving details about this repository from GitHub became invalid. This might prevent certain types of inspections from being run (in particular, everything related to pull requests).
Please ask an admin of your repository to re-new the access token on this website.

JobHandler.__log_job_stop()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
cc 1
dl 0
loc 5
ccs 0
cts 2
cp 0
crap 2
rs 9.4285
c 0
b 0
f 0
1
"""
2
Enarksh
3
4
Copyright 2013-2016 Set Based IT Consultancy
5
6
Licence MIT
7
"""
8
import logging
9
import os
10
import pwd
11
import sys
12
13
from enarksh.Config import Config
14
from enarksh.event.Event import Event
15
from enarksh.event.EventActor import EventActor
16
from enarksh.logger.message.LogFileMessage import LogFileMessage
17
from enarksh.spawner.ChunkLogger import ChunkLogger
18
19
20
class JobHandler(EventActor):
21
    """
22
    Class for reading the stdout and stderr and monitoring the processes of a job.
23
    """
24
    __allowed_users = []
25
    """
26
    The list of user names under which a process can be started.
27
28
    :type: list[str]
29
    """
30
31
    # ------------------------------------------------------------------------------------------------------------------
32
    def __init__(self, sch_id, rnd_id, user_name, args):
33
        """
34
        Creates a job handler for starting a job.
35
36
        :param int sch_id: The ID of the schedule of the job.
37
        :param int rnd_id: The ID of the job.
38
        :param str user_name: The user under which the job must run.
39
        :param args: The arguments for the job.
40
        """
41
        EventActor.__init__(self)
42
43
        self.__sch_id = sch_id
44
        """
45
        The ID of the schedule of the job.
46
47
        :type: int
48
        """
49
50
        self.__rnd_id = rnd_id
51
        """
52
        The ID of the job.
53
54
        :type: int
55
        """
56
57
        self.__user_name = user_name
58
        """
59
        The user under which the job must run.
60
61
        :type: str
62
        """
63
64
        self.__args = args
65
        """
66
        The arguments for the job.
67
68
        :type: list[str]
69
        """
70
71
        self.stdout_logger = ChunkLogger()
72
        """
73
        The chunk logger for STDOUT of the job.
74
75
        :type: enarksh.spawner.ChunkLogger.ChunkLogger
76
        """
77
78
        self.stderr_logger = ChunkLogger()
79
        """
80
        The chunk logger for STDERR of the job.
81
82
        :type: enarksh.spawner.ChunkLogger.ChunkLogger
83
        """
84
85
        self.__child_pid = -1
86
        """
87
        The PID of the child process.
88
89
        :type: int
90
        """
91
92
        self.__stdout = -1
93
        """
94
        The fd for reading the STDOUT of the child process.
95
96
        :type: int
97
        """
98
99
        self.__stderr = -1
100
        """
101
        The fd for reading the STDERR of the child process.
102
103
        :type: int
104
        """
105
106
        self.final_event = Event(self)
107
        """
108
        The event that will be fired when the job has been finally done.
109
110
        :type: enarksh.event.Event.Event
111
        """
112
113
        self.__log = logging.getLogger('enarksh')
114
        """
115
        The logger.
116
117
        :type: logging.Logger
118
        """
119
120
    # ------------------------------------------------------------------------------------------------------------------
121
    @property
122
    def pid(self):
123
        """
124
        Returns the PID of the job. If the job is finished the pid is -1.
125
126
        :rtype int: The PID of the job.
127
        """
128
        return self.__child_pid
129
130
    # ------------------------------------------------------------------------------------------------------------------
131
    @property
132
    def stderr(self):
133
        """
134
        Returns the file descriptor for reading the stderr of the child process.
135
136
        :rtype int: file descriptor
137
        """
138
        return self.__stderr
139
140
    # ------------------------------------------------------------------------------------------------------------------
141
    @property
142
    def stdout(self):
143
        """
144
        Returns the file descriptor for reading the stdout of the child process.
145
146
        :rtype int: file descriptor
147
        """
148
        return self.__stdout
149
150
    # ------------------------------------------------------------------------------------------------------------------
151
    @property
152
    def rnd_id(self):
153
        """
154
        Returns the ID of the job.
155
156
        :rtype: int
157
        """
158
        return self.__rnd_id
159
160
    # ------------------------------------------------------------------------------------------------------------------
161
    @property
162
    def sch_id(self):
163
        """
164
        Returns the ID of the schedule of the job.
165
166
        :rtype: int
167
        """
168
        return self.__sch_id
169
170
    # ------------------------------------------------------------------------------------------------------------------
171
    def __log_job_start(self):
172
        """
173
        Logs the starting of a job.
174
        """
175
        self.__log.info('Start rnd_id: {0:10d}, {1:8s}, {2:s}'.
176
                        format(self.__rnd_id, self.__user_name, str(self.__args)))
177
178
    # ------------------------------------------------------------------------------------------------------------------
179
    def __log_job_stop(self):
180
        """
181
        Logs the end of job.
182
        """
183
        self.__log.info('End   rnd_id: {0:10d}'.format(self.__rnd_id))
184
185
    # ------------------------------------------------------------------------------------------------------------------
186
    def __final(self):
187
        """
188
        When the job is finally done fires a done event.
189
        """
190
        if self.__child_pid == -1 and self.__stdout == -1 and self.__stderr == -1:
191
            self.__log_job_stop()
192
193
            # Close the files of the chunk loggers.
194
            self.stdout_logger.close()
195
            self.stderr_logger.close()
196
197
            # Send messages to logger daemon that the stdout and stderr of the job can be loaded into the database.
198
            self.get_logger_message('out').send_message('logger')
199
            self.get_logger_message('err').send_message('logger')
200
201
            # Fire the event that this job has been done completely.
202
            self.final_event.fire()
203
204
    # ------------------------------------------------------------------------------------------------------------------
205
    def set_job_has_finished(self):
206
        """
207
        Marks that the job has finished.
208
        """
209
        self.__child_pid = -1
210
        self.__final()
211
212
    # ------------------------------------------------------------------------------------------------------------------
213
    def get_logger_message(self, std):
214
        """
215
        Returns a message for the logger.
216
217
        :param str std: log for stdout, err for stderr
218
219
        :rtype: enarksh.message.logger.LogFileMessage.LogFileMessage
220
        """
221
        if std == 'out':
222
            chunk_logger = self.stdout_logger
223
        elif std == 'err':
224
            chunk_logger = self.stderr_logger
225
        else:
226
            raise ValueError("Unknown output '%s'." % std)
227
228
        return LogFileMessage(self.__rnd_id,
229
                              std,
230
                              chunk_logger.get_total_log_size(),
231
                              chunk_logger.filename1,
232
                              chunk_logger.filename2)
233
234
    # ------------------------------------------------------------------------------------------------------------------
235
    def read(self, fd):
236
        """
237
        Reads data from the file descriptor and stores the data in a chunk logger.
238
239
        :param int fd: The file descriptor.
240
        """
241
        if fd == self.__stdout:
242
            data = os.read(fd, 1000)
243
            if data == b'':
244
                # The pipe has been closed by the child process.
245
                os.close(self.__stdout)
246
                self.__stdout = -1
247
                self.__final()
248
            else:
249
                self.stdout_logger.write(data)
250
251
        elif fd == self.__stderr:
252
            data = os.read(fd, 1000)
253
            if data == b'':
254
                # The pipe has been closed by the child process.
255
                os.close(self.__stderr)
256
                self.__stderr = -1
257
                self.__final()
258
            else:
259
                self.stdout_logger.write(data)
260
                self.stderr_logger.write(data)
261
262
        else:
263
            raise ValueError('Unknown file descriptor %d.' % fd)
264
265
    # ------------------------------------------------------------------------------------------------------------------
266
    @staticmethod
267
    def read_allowed_users():
268
        """
269
        Reads the user names under which enarksh is allowed to start processes.
270
        """
271
        config = Config.get()
272
273
        JobHandler.__allowed_users = config.get_spawner_get_users()
274
275
    # ------------------------------------------------------------------------------------------------------------------
276
    def start_job(self):
0 ignored issues
show
Coding Style introduced by
This method should have a docstring.

The coding style of this project requires that you add a docstring to this code element. Below, you find an example for methods:

class SomeClass:
    def some_method(self):
        """Do x and return foo."""

If you would like to know more about docstrings, we recommend to read PEP-257: Docstring Conventions.

Loading history...
277
        self.__log_job_start()
278
279
        # Create pipes for stdout and stderr.
280
        pipe_stdout = os.pipe()
281
        pipe_stderr = os.pipe()
282
283
        self.__child_pid = os.fork()
284
        if self.__child_pid == 0:
285
            # Child process.
286
            try:
287
                # Close the read ends from the pipes.
288
                os.close(pipe_stdout[0])
289
                os.close(pipe_stderr[0])
290
291
                # Duplicate stdout and stderr on the pipes.
292
                sys.stdout.flush()
293
                sys.stderr.flush()
294
                os.dup2(pipe_stdout[1], sys.stdout.fileno())
295
                os.dup2(pipe_stderr[1], sys.stderr.fileno())
296
297
                # Set the effective user and group.
298
                if self.__user_name in self.__allowed_users:
299
                    _, _, uid, gid, _, _, _ = pwd.getpwnam(self.__user_name)
300
                    os.setuid(0)
301
                    os.setresgid(gid, gid, gid)
302
                    os.setresuid(uid, uid, uid)
303
                else:
304
                    raise RuntimeError("Spanner is not allowed to start processes under user '%s'." % self.__user_name)
305
306
                # Set variable for subprocess.
307
                os.putenv('ENK_RND_ID', str(self.__rnd_id))
308
                os.putenv('ENK_SCH_ID', str(self.__sch_id))
309
310
                # Replace this child process with the actual job.
311
                os.execv(self.__args[0], self.__args)
312
313
            except Exception as e:
0 ignored issues
show
Coding Style Naming introduced by
The name e does not conform to the variable naming conventions ([a-z_][a-z0-9_]{1,60}$).

This check looks for invalid names for a range of different identifiers.

You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements.

If your project includes a Pylint configuration file, the settings contained in that file take precedence.

To find out more about Pylint, please refer to their site.

Loading history...
314
                self.__log.error('Unable to start job')
315
                self.__log.error('Reason: {}'.format(str(e)))
316
                self.__log.exception('Error')
317
318
                # Exit immediately without running the exit handlers (e.g. from daemon) from the parent process.
319
                os._exit(-1)
320
        else:
321
            # Parent process.
322
            # Close the write ends from the pipes.
323
            os.close(pipe_stdout[1])
324
            os.close(pipe_stderr[1])
325
326
            # Remember the fds for reading the stdout and stderr from the child process.
327
            self.__stdout = pipe_stdout[0]
328
            self.__stderr = pipe_stderr[0]
329
330
            # Make reading from the pipes non-blocking.
331
            # fcntl.fcntl(self._stdout, 0)
332
            # fcntl.fcntl(self._stderr, 0)
333
334
# ----------------------------------------------------------------------------------------------------------------------
335