Completed
Push — master ( 0b9edb...ab2b1a )
by Mathieu
10s
created

smartdispatch.PBS.add_resources()   D

Complexity

Conditions 8

Size

Total Lines 32

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 8
dl 0
loc 32
rs 4
1
import re
2
from collections import OrderedDict
3
4
regex_walltime = re.compile("(\d+:){1,4}")
5
regex_resource_nodes = re.compile("[a-zA-Z0-9]+(:ppn=\d+)?(:gpus=\d+)?(:[a-zA-Z0-9]+)*")
6
regex_resource_pmem = re.compile("[0-9]+(b|kb|mb|gb|tb)?")
7
8
9
class PBS(object):
10
    """ Offers functionalities to manage a PBS file.
11
12
    For more information about the PBS file format see:
13
    `http://docs.adaptivecomputing.com/suite/8-0/basic/help.htm#topics/torque/2-jobs/requestingRes.htm?TocPath=TORQUE Resource Manager|Submitting and managing jobs|Job submission|_____3`
14
15
    Parameters
16
    ----------
17
    queue_name : str
18
        name of the queue on which commands will be executed
19
    walltime : str
20
        maximum time allocated to execute every commands (DD:HH:MM:SS)
21
    """
22
    def __init__(self, queue_name, walltime):
23
        if queue_name is None or len(queue_name) == 0:
24
            raise ValueError("Queue's name must be provided.")
25
26
        self.queue_name = queue_name
27
        self.modules = []
28
        self.commands = []
29
30
        self.resources = OrderedDict()
31
        self.add_resources(walltime=walltime)
32
33
        self.options = OrderedDict()
34
        self.add_options(q=queue_name)
35
36
        # Declares that all environment variables in the qsub command's environment are to be exported to the batch job.
37
        self.add_options(V="")
38
39
    def add_options(self, **options):
40
        """ Adds options to this PBS file.
41
42
        Parameters
43
        ----------
44
        **options : dict
45
            each key is the name of a PBS option (see `Options`)
46
47
        Options
48
        -------
49
        *A* : account_string
50
            Defines the account string associated with the job.
51
        *N* : name (up to 64 characters)
52
            Declares a name for the job. It must consist of printable,
53
            non white space characters with the first character alphabetic.
54
        """
55
        for option_name, option_value in options.items():
56
            # If known option, validate it.
57
            if option_name.strip('-') == 'N':
58
                if len(option_name) > 64:
59
                    raise ValueError("Maximum number of characters for the name is: 64")
60
61
            self.options["-" + option_name] = option_value
62
63
    def add_resources(self, **resources):
64
        """ Adds resources to this PBS file.
65
66
        Parameters
67
        ----------
68
        **resources : dict
69
            each key is the name of a PBS resource (see `Resources`)
70
71
        Resources
72
        ---------
73
        *nodes* : nodes={<node_count>|<hostname>}[:ppn=<ppn>][:gpus=<gpu>][:<property>[:<property>]...]
74
            Specifies how many and what type of nodes to use
75
            **nodes={<node_count>|<hostname>}**: type of nodes
76
            **ppn=#**: Number of process per node requested for this job
77
            **gpus=#**: Number of process per node requested for this job
78
            **property**: A string specifying a node's feature
79
        *pmem*: pmem=[0-9]+(b|kb|mb|gb|tb)
80
            Specifies the maximum amount of physical memory used by any single process of the job.
81
        """
82
        for resource_name, resource_value in resources.items():
83
            # If known ressource, validate it.
84
            if resource_name == 'nodes':
85
                if re.match(regex_resource_nodes, str(resource_value)) is None:
86
                    raise ValueError("Unknown format for PBS resource: nodes")
87
            elif resource_name == 'pmem':
88
                if re.match(regex_resource_pmem, str(resource_value)) is None:
89
                    raise ValueError("Unknown format for PBS resource: pmem")
90
            elif resource_name == 'walltime':
91
                if re.match(regex_walltime, str(resource_value)) is None:
92
                    raise ValueError("Unknown format for PBS resource: walltime (dd:hh:mm:ss)")
93
94
            self.resources[resource_name] = resource_value
95
96
    def add_modules_to_load(self, *modules):
97
        """ Adds modules to load prior to execute the job on a node.
98
99
        Parameters
100
        ----------
101
        *modules : list of str
102
            each string represents the name of the module to load
103
        """
104
        self.modules += modules
105
106
    def add_commands(self, *commands):
107
        """ Sets commands to execute on a node.
108
109
        Parameters
110
        ----------
111
        *commands : list of str
112
            each string represents a command that is part of this job
113
        """
114
        self.commands += commands
115
116
    def save(self, filename):
117
        """ Saves this PBS job to a file.
118
119
        Parameters
120
        ----------
121
        filename : str
122
            specified where to save this PBS file
123
        """
124
        with open(filename, 'w') as pbs_file:
125
            pbs_file.write(str(self))
126
127
    def __str__(self):
128
        pbs = []
129
        pbs += ["#!/bin/bash"]
130
131
        for option_name, option_value in self.options.items():
132
            if option_value == "":
133
                pbs += ["#PBS {0}".format(option_name)]
134
            else:
135
                pbs += ["#PBS {0} {1}".format(option_name, option_value)]
136
137
        for resource_name, resource_value in self.resources.items():
138
            pbs += ["#PBS -l {0}={1}".format(resource_name, resource_value)]
139
140
        pbs += ["\n# Modules #"]
141
        for module in self.modules:
142
            pbs += ["module load " + module]
143
144
        pbs += ["\n# Commands #"]
145
        pbs += ["{command} &".format(command=command) for command in self.commands]
146
147
        pbs += ["\nwait"]
148
        return "\n".join(pbs)
149