1
|
|
|
#!/usr/bin/env python2 |
2
|
|
|
# -*- coding: utf-8 -*- |
3
|
|
|
|
4
|
|
|
import os |
5
|
|
|
import sys |
6
|
|
|
import signal |
7
|
|
|
import argparse |
8
|
|
|
import subprocess |
9
|
|
|
import logging |
10
|
|
|
import time as t |
11
|
|
|
|
12
|
|
|
from smartdispatch import utils |
13
|
|
|
from smartdispatch.command_manager import CommandManager |
14
|
|
|
|
15
|
|
|
|
16
|
|
|
def parse_arguments(): |
17
|
|
|
parser = argparse.ArgumentParser() |
18
|
|
|
parser.add_argument('commands_filename', type=str, help='File containing all commands to execute.') |
19
|
|
|
parser.add_argument('logs_dir', type=str, help="Folder where to put commands' stdout and stderr.") |
20
|
|
|
parser.add_argument('-r', '--assumeResumable', action='store_true', help="Assume that commands are resumable and put them into the pending list on worker termination.") |
21
|
|
|
args = parser.parse_args() |
22
|
|
|
|
23
|
|
|
# Check for invalid arguments |
24
|
|
|
if not os.path.isfile(args.commands_filename): |
25
|
|
|
parser.error("Invalid file path. Specify path to a file containing commands.") |
26
|
|
|
|
27
|
|
|
if not os.path.isdir(args.logs_dir): |
28
|
|
|
parser.error("You need to specify the folder path where to put command' stdout and stderr.") |
29
|
|
|
|
30
|
|
|
return args |
31
|
|
|
|
32
|
|
|
|
33
|
|
|
def main(): |
34
|
|
|
# Necessary if we want 'logging.info' to appear in stderr. |
35
|
|
|
logging.root.setLevel(logging.INFO) |
36
|
|
|
|
37
|
|
|
args = parse_arguments() |
38
|
|
|
|
39
|
|
|
command_manager = CommandManager(args.commands_filename) |
40
|
|
|
|
41
|
|
|
if args.assumeResumable: |
42
|
|
|
# Handle TERM signal gracefully by sending running commands back to |
43
|
|
|
# the list of pending commands. |
44
|
|
|
# NOTE: There are several cases when the handler will not have |
45
|
|
|
# up-to-date information on running the command and/or process, |
46
|
|
|
# but chances of that happening are VERY slim and the |
47
|
|
|
# consequences are not fatal. |
48
|
|
|
def sigterm_handler(signal, frame): |
49
|
|
|
if sigterm_handler.triggered: |
50
|
|
|
return |
51
|
|
|
else: |
52
|
|
|
sigterm_handler.triggered = True |
53
|
|
|
|
54
|
|
|
if sigterm_handler.proc is not None: |
55
|
|
|
sigterm_handler.proc.wait() |
56
|
|
|
if sigterm_handler.command is not None: |
57
|
|
|
command_manager.set_running_command_as_pending(sigterm_handler.command) |
58
|
|
|
sys.exit(0) |
59
|
|
|
sigterm_handler.triggered = False |
60
|
|
|
sigterm_handler.command = None |
61
|
|
|
sigterm_handler.proc = None |
62
|
|
|
signal.signal(signal.SIGTERM, sigterm_handler) |
63
|
|
|
|
64
|
|
|
while True: |
65
|
|
|
command = command_manager.get_command_to_run() |
66
|
|
|
if args.assumeResumable: |
67
|
|
|
sigterm_handler.proc = None |
68
|
|
|
sigterm_handler.command = command |
69
|
|
|
|
70
|
|
|
if command is None: |
71
|
|
|
break |
72
|
|
|
|
73
|
|
|
uid = utils.generate_uid_from_string(command) |
74
|
|
|
stdout_filename = os.path.join(args.logs_dir, uid + ".out") |
75
|
|
|
stderr_filename = os.path.join(args.logs_dir, uid + ".err") |
76
|
|
|
|
77
|
|
|
# Get job and node ID |
78
|
|
|
job_id = os.environ.get('PBS_JOBID', 'undefined') |
79
|
|
|
node_name = os.environ.get('HOSTNAME', 'undefined') |
80
|
|
|
|
81
|
|
|
with open(stdout_filename, 'a') as stdout_file: |
82
|
|
|
with open(stderr_filename, 'a') as stderr_file: |
83
|
|
|
log_datetime = t.strftime("## SMART-DISPATCH - Started on: %Y-%m-%d %H:%M:%S - In job: {job_id} - On nodes: {node_name} ##\n".format(job_id=job_id, node_name=node_name)) |
84
|
|
|
if stdout_file.tell() > 0: # Not the first line in the log file. |
85
|
|
|
log_datetime = t.strftime("\n## SMART-DISPATCH - Resumed on: %Y-%m-%d %H:%M:%S - In job: {job_id} - On nodes: {node_name} ##\n".format(job_id=job_id, node_name=node_name)) |
86
|
|
|
|
87
|
|
|
log_command = "## SMART-DISPATCH - Command: " + command + '\n' |
88
|
|
|
|
89
|
|
|
stdout_file.write(log_datetime + log_command) |
90
|
|
|
stdout_file.flush() |
91
|
|
|
stderr_file.write(log_datetime + log_command) |
92
|
|
|
stderr_file.flush() |
93
|
|
|
|
94
|
|
|
proc = subprocess.Popen(command, stdout=stdout_file, stderr=stderr_file, shell=True) |
95
|
|
|
if args.assumeResumable: |
96
|
|
|
sigterm_handler.proc = proc |
97
|
|
|
error_code = proc.wait() |
98
|
|
|
|
99
|
|
|
command_manager.set_running_command_as_finished(command, error_code) |
100
|
|
|
|
101
|
|
|
if __name__ == '__main__': |
102
|
|
|
main() |
103
|
|
|
|