Completed
Pull Request — master (#145)
by
unknown
01:14
created

sigterm_handler()   B

Complexity

Conditions 5

Size

Total Lines 14

Duplication

Lines 0
Ratio 0 %

Importance

Changes 3
Bugs 1 Features 1
Metric Value
cc 5
c 3
b 1
f 1
dl 0
loc 14
rs 8.5454
1
#!/usr/bin/env python2
2
# -*- coding: utf-8 -*-
3
4
import os
5
import sys
6
import signal
7
import argparse
8
import subprocess
9
import logging
10
import time as t
11
12
from smartdispatch import utils
13
from smartdispatch.command_manager import CommandManager
14
15
16
def parse_arguments():
17
    parser = argparse.ArgumentParser()
18
    parser.add_argument('commands_filename', type=str, help='File containing all commands to execute.')
19
    parser.add_argument('logs_dir', type=str, help="Folder where to put commands' stdout and stderr.")
20
    parser.add_argument('-r', '--assumeResumable', action='store_true', help="Assume that commands are resumable and put them into the pending list on worker termination.")
21
    args = parser.parse_args()
22
23
    # Check for invalid arguments
24
    if not os.path.isfile(args.commands_filename):
25
        parser.error("Invalid file path. Specify path to a file containing commands.")
26
27
    if not os.path.isdir(args.logs_dir):
28
        parser.error("You need to specify the folder path where to put command' stdout and stderr.")
29
30
    return args
31
32
33
def main():
34
    # Necessary if we want 'logging.info' to appear in stderr.
35
    logging.root.setLevel(logging.INFO)
36
37
    args = parse_arguments()
38
39
    command_manager = CommandManager(args.commands_filename)
40
41
    if args.assumeResumable:
42
        # Handle TERM signal gracefully by sending running commands back to
43
        # the list of pending commands.
44
        # NOTE: There are several cases when the handler will not have
45
        #       up-to-date information on running the command and/or process,
46
        #       but chances of that happening are VERY slim and the
47
        #       consequences are not fatal.
48
        def sigterm_handler(signal, frame):
49
            if sigterm_handler.triggered:
50
                return
51
            else:
52
                sigterm_handler.triggered = True
53
            error_code = 0
54
            if sigterm_handler.proc is not None:
55
                error_code = sigterm_handler.proc.wait()
56
            if sigterm_handler.command is not None:
57
                if error_code == 0:  # The command was terminated successfully.
58
                    command_manager.set_running_command_as_pending(sigterm_handler.command)
59
                else:
60
                    command_manager.set_running_command_as_finished(sigterm_handler.command, error_code)
61
            sys.exit(0)
62
        sigterm_handler.triggered = False
63
        sigterm_handler.command = None
64
        sigterm_handler.proc = None
65
        signal.signal(signal.SIGTERM, sigterm_handler)
66
67
    while True:
68
        command = command_manager.get_command_to_run()
69
        if args.assumeResumable:
70
            sigterm_handler.proc = None
71
            sigterm_handler.command = command
72
73
        if command is None:
74
            break
75
76
        uid = utils.generate_uid_from_string(command)
77
        stdout_filename = os.path.join(args.logs_dir, uid + ".out")
78
        stderr_filename = os.path.join(args.logs_dir, uid + ".err")
79
80
        # Get job and node ID
81
        job_id = os.environ.get('PBS_JOBID', 'undefined')
82
        node_name = os.environ.get('HOSTNAME', 'undefined')
83
84
        with open(stdout_filename, 'a') as stdout_file:
85
            with open(stderr_filename, 'a') as stderr_file:
86
                log_datetime = t.strftime("## SMART-DISPATCH - Started on: %Y-%m-%d %H:%M:%S - In job: {job_id} - On nodes: {node_name} ##\n".format(job_id=job_id, node_name=node_name))
87
                if stdout_file.tell() > 0:  # Not the first line in the log file.
88
                    log_datetime = t.strftime("\n## SMART-DISPATCH - Resumed on: %Y-%m-%d %H:%M:%S - In job: {job_id} - On nodes: {node_name} ##\n".format(job_id=job_id, node_name=node_name))
89
90
                log_command = "## SMART-DISPATCH - Command: " + command + '\n'
91
92
                stdout_file.write(log_datetime + log_command)
93
                stdout_file.flush()
94
                stderr_file.write(log_datetime + log_command)
95
                stderr_file.flush()
96
97
                proc = subprocess.Popen(command, stdout=stdout_file, stderr=stderr_file, shell=True)
98
                if args.assumeResumable:
99
                    sigterm_handler.proc = proc
100
                error_code = proc.wait()
101
102
        command_manager.set_running_command_as_finished(command, error_code)
103
104
if __name__ == '__main__':
105
    main()
106