sigterm_handler()   A
last analyzed

Complexity

Conditions 4

Size

Total Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 2
Bugs 0 Features 1
Metric Value
cc 4
c 2
b 0
f 1
dl 0
loc 11
rs 9.2
1
#!/usr/bin/env python2
2
# -*- coding: utf-8 -*-
3
4
import os
5
import sys
6
import signal
7
import argparse
8
import subprocess
9
import logging
10
import time as t
11
12
from smartdispatch import utils
13
from smartdispatch.command_manager import CommandManager
14
15
16
def parse_arguments():
17
    parser = argparse.ArgumentParser()
18
    parser.add_argument('commands_filename', type=str, help='File containing all commands to execute.')
19
    parser.add_argument('logs_dir', type=str, help="Folder where to put commands' stdout and stderr.")
20
    parser.add_argument('-r', '--assumeResumable', action='store_true', help="Assume that commands are resumable and put them into the pending list on worker termination.")
21
    args = parser.parse_args()
22
23
    # Check for invalid arguments
24
    if not os.path.isfile(args.commands_filename):
25
        parser.error("Invalid file path. Specify path to a file containing commands.")
26
27
    if not os.path.isdir(args.logs_dir):
28
        parser.error("You need to specify the folder path where to put command' stdout and stderr.")
29
30
    return args
31
32
33
def main():
34
    # Necessary if we want 'logging.info' to appear in stderr.
35
    logging.root.setLevel(logging.INFO)
36
37
    args = parse_arguments()
38
39
    command_manager = CommandManager(args.commands_filename)
40
41
    if args.assumeResumable:
42
        # Handle TERM signal gracefully by sending running commands back to
43
        # the list of pending commands.
44
        # NOTE: There are several cases when the handler will not have
45
        #       up-to-date information on running the command and/or process,
46
        #       but chances of that happening are VERY slim and the
47
        #       consequences are not fatal.
48
        def sigterm_handler(signal, frame):
49
            if sigterm_handler.triggered:
50
                return
51
            else:
52
                sigterm_handler.triggered = True
53
54
            if sigterm_handler.proc is not None:
55
                sigterm_handler.proc.wait()
56
            if sigterm_handler.command is not None:
57
                command_manager.set_running_command_as_pending(sigterm_handler.command)
58
            sys.exit(0)
59
        sigterm_handler.triggered = False
60
        sigterm_handler.command = None
61
        sigterm_handler.proc = None
62
        signal.signal(signal.SIGTERM, sigterm_handler)
63
64
    while True:
65
        command = command_manager.get_command_to_run()
66
        if args.assumeResumable:
67
            sigterm_handler.proc = None
68
            sigterm_handler.command = command
69
70
        if command is None:
71
            break
72
73
        uid = utils.generate_uid_from_string(command)
74
        stdout_filename = os.path.join(args.logs_dir, uid + ".out")
75
        stderr_filename = os.path.join(args.logs_dir, uid + ".err")
76
77
        # Get job and node ID
78
        job_id = os.environ.get('PBS_JOBID', 'undefined')
79
        node_name = os.environ.get('HOSTNAME', 'undefined')
80
81
        with open(stdout_filename, 'a') as stdout_file:
82
            with open(stderr_filename, 'a') as stderr_file:
83
                log_datetime = t.strftime("## SMART-DISPATCH - Started on: %Y-%m-%d %H:%M:%S - In job: {job_id} - On nodes: {node_name} ##\n".format(job_id=job_id, node_name=node_name))
84
                if stdout_file.tell() > 0:  # Not the first line in the log file.
85
                    log_datetime = t.strftime("\n## SMART-DISPATCH - Resumed on: %Y-%m-%d %H:%M:%S - In job: {job_id} - On nodes: {node_name} ##\n".format(job_id=job_id, node_name=node_name))
86
87
                log_command = "## SMART-DISPATCH - Command: " + command + '\n'
88
89
                stdout_file.write(log_datetime + log_command)
90
                stdout_file.flush()
91
                stderr_file.write(log_datetime + log_command)
92
                stderr_file.flush()
93
94
                proc = subprocess.Popen(command, stdout=stdout_file, stderr=stderr_file, shell=True)
95
                if args.assumeResumable:
96
                    sigterm_handler.proc = proc
97
                error_code = proc.wait()
98
99
        command_manager.set_running_command_as_finished(command, error_code)
100
101
if __name__ == '__main__':
102
    main()
103