1 | """ |
||
2 | Enarksh |
||
3 | |||
4 | Copyright 2013-2016 Set Based IT Consultancy |
||
5 | |||
6 | Licence MIT |
||
7 | """ |
||
8 | import logging |
||
9 | import os |
||
10 | import pwd |
||
11 | import sys |
||
12 | |||
13 | from enarksh.Config import Config |
||
14 | from enarksh.event.Event import Event |
||
15 | from enarksh.event.EventActor import EventActor |
||
16 | from enarksh.logger.message.LogFileMessage import LogFileMessage |
||
17 | from enarksh.spawner.ChunkLogger import ChunkLogger |
||
18 | |||
19 | |||
20 | class JobHandler(EventActor): |
||
21 | """ |
||
22 | Class for reading the stdout and stderr and monitoring the processes of a job. |
||
23 | """ |
||
24 | __allowed_users = [] |
||
25 | """ |
||
26 | The list of user names under which a process can be started. |
||
27 | |||
28 | :type: list[str] |
||
29 | """ |
||
30 | |||
31 | # ------------------------------------------------------------------------------------------------------------------ |
||
32 | def __init__(self, sch_id, rnd_id, user_name, args): |
||
33 | """ |
||
34 | Creates a job handler for starting a job. |
||
35 | |||
36 | :param int sch_id: The ID of the schedule of the job. |
||
37 | :param int rnd_id: The ID of the job. |
||
38 | :param str user_name: The user under which the job must run. |
||
39 | :param args: The arguments for the job. |
||
40 | """ |
||
41 | EventActor.__init__(self) |
||
42 | |||
43 | self.__sch_id = sch_id |
||
44 | """ |
||
45 | The ID of the schedule of the job. |
||
46 | |||
47 | :type: int |
||
48 | """ |
||
49 | |||
50 | self.__rnd_id = rnd_id |
||
51 | """ |
||
52 | The ID of the job. |
||
53 | |||
54 | :type: int |
||
55 | """ |
||
56 | |||
57 | self.__user_name = user_name |
||
58 | """ |
||
59 | The user under which the job must run. |
||
60 | |||
61 | :type: str |
||
62 | """ |
||
63 | |||
64 | self.__args = args |
||
65 | """ |
||
66 | The arguments for the job. |
||
67 | |||
68 | :type: list[str] |
||
69 | """ |
||
70 | |||
71 | self.stdout_logger = ChunkLogger() |
||
72 | """ |
||
73 | The chunk logger for STDOUT of the job. |
||
74 | |||
75 | :type: enarksh.spawner.ChunkLogger.ChunkLogger |
||
76 | """ |
||
77 | |||
78 | self.stderr_logger = ChunkLogger() |
||
79 | """ |
||
80 | The chunk logger for STDERR of the job. |
||
81 | |||
82 | :type: enarksh.spawner.ChunkLogger.ChunkLogger |
||
83 | """ |
||
84 | |||
85 | self.__child_pid = -1 |
||
86 | """ |
||
87 | The PID of the child process. |
||
88 | |||
89 | :type: int |
||
90 | """ |
||
91 | |||
92 | self.__stdout = -1 |
||
93 | """ |
||
94 | The fd for reading the STDOUT of the child process. |
||
95 | |||
96 | :type: int |
||
97 | """ |
||
98 | |||
99 | self.__stderr = -1 |
||
100 | """ |
||
101 | The fd for reading the STDERR of the child process. |
||
102 | |||
103 | :type: int |
||
104 | """ |
||
105 | |||
106 | self.final_event = Event(self) |
||
107 | """ |
||
108 | The event that will be fired when the job has been finally done. |
||
109 | |||
110 | :type: enarksh.event.Event.Event |
||
111 | """ |
||
112 | |||
113 | self.__log = logging.getLogger('enarksh') |
||
114 | """ |
||
115 | The logger. |
||
116 | |||
117 | :type: logging.Logger |
||
118 | """ |
||
119 | |||
120 | # ------------------------------------------------------------------------------------------------------------------ |
||
121 | @property |
||
122 | def pid(self): |
||
123 | """ |
||
124 | Returns the PID of the job. If the job is finished the pid is -1. |
||
125 | |||
126 | :rtype int: The PID of the job. |
||
127 | """ |
||
128 | return self.__child_pid |
||
129 | |||
130 | # ------------------------------------------------------------------------------------------------------------------ |
||
131 | @property |
||
132 | def stderr(self): |
||
133 | """ |
||
134 | Returns the file descriptor for reading the stderr of the child process. |
||
135 | |||
136 | :rtype int: file descriptor |
||
137 | """ |
||
138 | return self.__stderr |
||
139 | |||
140 | # ------------------------------------------------------------------------------------------------------------------ |
||
141 | @property |
||
142 | def stdout(self): |
||
143 | """ |
||
144 | Returns the file descriptor for reading the stdout of the child process. |
||
145 | |||
146 | :rtype int: file descriptor |
||
147 | """ |
||
148 | return self.__stdout |
||
149 | |||
150 | # ------------------------------------------------------------------------------------------------------------------ |
||
151 | @property |
||
152 | def rnd_id(self): |
||
153 | """ |
||
154 | Returns the ID of the job. |
||
155 | |||
156 | :rtype: int |
||
157 | """ |
||
158 | return self.__rnd_id |
||
159 | |||
160 | # ------------------------------------------------------------------------------------------------------------------ |
||
161 | @property |
||
162 | def sch_id(self): |
||
163 | """ |
||
164 | Returns the ID of the schedule of the job. |
||
165 | |||
166 | :rtype: int |
||
167 | """ |
||
168 | return self.__sch_id |
||
169 | |||
170 | # ------------------------------------------------------------------------------------------------------------------ |
||
171 | def __log_job_start(self): |
||
172 | """ |
||
173 | Logs the starting of a job. |
||
174 | """ |
||
175 | self.__log.info('Start rnd_id: {0:10d}, {1:8s}, {2:s}'. |
||
176 | format(self.__rnd_id, self.__user_name, str(self.__args))) |
||
177 | |||
178 | # ------------------------------------------------------------------------------------------------------------------ |
||
179 | def __log_job_stop(self): |
||
180 | """ |
||
181 | Logs the end of job. |
||
182 | """ |
||
183 | self.__log.info('End rnd_id: {0:10d}'.format(self.__rnd_id)) |
||
184 | |||
185 | # ------------------------------------------------------------------------------------------------------------------ |
||
186 | def __final(self): |
||
187 | """ |
||
188 | When the job is finally done fires a done event. |
||
189 | """ |
||
190 | if self.__child_pid == -1 and self.__stdout == -1 and self.__stderr == -1: |
||
191 | self.__log_job_stop() |
||
192 | |||
193 | # Close the files of the chunk loggers. |
||
194 | self.stdout_logger.close() |
||
195 | self.stderr_logger.close() |
||
196 | |||
197 | # Send messages to logger daemon that the stdout and stderr of the job can be loaded into the database. |
||
198 | self.get_logger_message('out').send_message('logger') |
||
199 | self.get_logger_message('err').send_message('logger') |
||
200 | |||
201 | # Fire the event that this job has been done completely. |
||
202 | self.final_event.fire() |
||
203 | |||
204 | # ------------------------------------------------------------------------------------------------------------------ |
||
205 | def set_job_has_finished(self): |
||
206 | """ |
||
207 | Marks that the job has finished. |
||
208 | """ |
||
209 | self.__child_pid = -1 |
||
210 | self.__final() |
||
211 | |||
212 | # ------------------------------------------------------------------------------------------------------------------ |
||
213 | def get_logger_message(self, std): |
||
214 | """ |
||
215 | Returns a message for the logger. |
||
216 | |||
217 | :param str std: log for stdout, err for stderr |
||
218 | |||
219 | :rtype: enarksh.message.logger.LogFileMessage.LogFileMessage |
||
220 | """ |
||
221 | if std == 'out': |
||
222 | chunk_logger = self.stdout_logger |
||
223 | elif std == 'err': |
||
224 | chunk_logger = self.stderr_logger |
||
225 | else: |
||
226 | raise ValueError("Unknown output '%s'." % std) |
||
227 | |||
228 | return LogFileMessage(self.__rnd_id, |
||
229 | std, |
||
230 | chunk_logger.get_total_log_size(), |
||
231 | chunk_logger.filename1, |
||
232 | chunk_logger.filename2) |
||
233 | |||
234 | # ------------------------------------------------------------------------------------------------------------------ |
||
235 | def read(self, fd): |
||
236 | """ |
||
237 | Reads data from the file descriptor and stores the data in a chunk logger. |
||
238 | |||
239 | :param int fd: The file descriptor. |
||
240 | """ |
||
241 | if fd == self.__stdout: |
||
242 | data = os.read(fd, 1000) |
||
243 | if data == b'': |
||
244 | # The pipe has been closed by the child process. |
||
245 | os.close(self.__stdout) |
||
246 | self.__stdout = -1 |
||
247 | self.__final() |
||
248 | else: |
||
249 | self.stdout_logger.write(data) |
||
250 | |||
251 | elif fd == self.__stderr: |
||
252 | data = os.read(fd, 1000) |
||
253 | if data == b'': |
||
254 | # The pipe has been closed by the child process. |
||
255 | os.close(self.__stderr) |
||
256 | self.__stderr = -1 |
||
257 | self.__final() |
||
258 | else: |
||
259 | self.stdout_logger.write(data) |
||
260 | self.stderr_logger.write(data) |
||
261 | |||
262 | else: |
||
263 | raise ValueError('Unknown file descriptor %d.' % fd) |
||
264 | |||
265 | # ------------------------------------------------------------------------------------------------------------------ |
||
266 | @staticmethod |
||
267 | def read_allowed_users(): |
||
268 | """ |
||
269 | Reads the user names under which enarksh is allowed to start processes. |
||
270 | """ |
||
271 | config = Config.get() |
||
272 | |||
273 | JobHandler.__allowed_users = config.get_spawner_get_users() |
||
274 | |||
275 | # ------------------------------------------------------------------------------------------------------------------ |
||
276 | def start_job(self): |
||
0 ignored issues
–
show
|
|||
277 | self.__log_job_start() |
||
278 | |||
279 | # Create pipes for stdout and stderr. |
||
280 | pipe_stdout = os.pipe() |
||
281 | pipe_stderr = os.pipe() |
||
282 | |||
283 | self.__child_pid = os.fork() |
||
284 | if self.__child_pid == 0: |
||
285 | # Child process. |
||
286 | try: |
||
287 | # Close the read ends from the pipes. |
||
288 | os.close(pipe_stdout[0]) |
||
289 | os.close(pipe_stderr[0]) |
||
290 | |||
291 | # Duplicate stdout and stderr on the pipes. |
||
292 | sys.stdout.flush() |
||
293 | sys.stderr.flush() |
||
294 | os.dup2(pipe_stdout[1], sys.stdout.fileno()) |
||
295 | os.dup2(pipe_stderr[1], sys.stderr.fileno()) |
||
296 | |||
297 | # Set the effective user and group. |
||
298 | if self.__user_name in self.__allowed_users: |
||
299 | _, _, uid, gid, _, _, _ = pwd.getpwnam(self.__user_name) |
||
300 | os.setuid(0) |
||
301 | os.setresgid(gid, gid, gid) |
||
302 | os.setresuid(uid, uid, uid) |
||
303 | else: |
||
304 | raise RuntimeError("Spanner is not allowed to start processes under user '%s'." % self.__user_name) |
||
305 | |||
306 | # Set variable for subprocess. |
||
307 | os.putenv('ENK_RND_ID', str(self.__rnd_id)) |
||
308 | os.putenv('ENK_SCH_ID', str(self.__sch_id)) |
||
309 | |||
310 | # Replace this child process with the actual job. |
||
311 | os.execv(self.__args[0], self.__args) |
||
312 | |||
313 | except Exception as e: |
||
0 ignored issues
–
show
The name
e does not conform to the variable naming conventions ([a-z_][a-z0-9_]{1,60}$ ).
This check looks for invalid names for a range of different identifiers. You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements. If your project includes a Pylint configuration file, the settings contained in that file take precedence. To find out more about Pylint, please refer to their site. ![]() |
|||
314 | self.__log.error('Unable to start job') |
||
315 | self.__log.error('Reason: {}'.format(str(e))) |
||
316 | self.__log.exception('Error') |
||
317 | |||
318 | # Exit immediately without running the exit handlers (e.g. from daemon) from the parent process. |
||
319 | os._exit(-1) |
||
320 | else: |
||
321 | # Parent process. |
||
322 | # Close the write ends from the pipes. |
||
323 | os.close(pipe_stdout[1]) |
||
324 | os.close(pipe_stderr[1]) |
||
325 | |||
326 | # Remember the fds for reading the stdout and stderr from the child process. |
||
327 | self.__stdout = pipe_stdout[0] |
||
328 | self.__stderr = pipe_stderr[0] |
||
329 | |||
330 | # Make reading from the pipes non-blocking. |
||
331 | # fcntl.fcntl(self._stdout, 0) |
||
332 | # fcntl.fcntl(self._stderr, 0) |
||
333 | |||
334 | # ---------------------------------------------------------------------------------------------------------------------- |
||
335 |
The coding style of this project requires that you add a docstring to this code element. Below, you find an example for methods:
If you would like to know more about docstrings, we recommend to read PEP-257: Docstring Conventions.