Completed
Push — develop ( 84fb58...03de07 )
by
unknown
01:12
created

node_config()   B

Complexity

Conditions 4

Size

Total Lines 24

Duplication

Lines 0
Ratio 0 %

Importance

Changes 17
Bugs 4 Features 2
Metric Value
cc 4
c 17
b 4
f 2
dl 0
loc 24
rs 8.6845
1
2
import os
3
import re
4
import math
5
from string import Template
6
from subprocess import Popen, PIPE, check_output
7
from mycluster import get_data
8
from mycluster import load_template
9
10
"""
11
12
bjobs -u all -q emerald
13
bqueues -l emerald
14
15
"""
16
17
18
def scheduler_type():
19
    return 'lsf'
20
21
def name():
22
    lsid_output = check_output(['lsid']).splitlines()
23
    for line in lsid_output:
24
        if line.startswith('My cluster name is'):
25
            return line.rsplit(' ',1)[1].strip()
26
27
    return 'undefined'
28
29
def queues():
30
    queue_list = []
31
32
    with os.popen('bqueues -w -u `whoami`') as f:
33
        f.readline(); # read header
34
        for line in f:
35
            q = line.split(' ')[0].strip()
36
            queue_list.append(q)
37
38
    return queue_list
39
40
41
def accounts():
42
    return []
43
44
45
def available_tasks(queue_id):
46
47
    # split queue id into queue and parallel env
48
    # list free slots
49
    free_tasks = 0
50
    max_tasks = 0
51
    run_tasks = 0
52
    queue_name   = queue_id
53
    q_output = check_output(['bqueues',queue_name]).splitlines()
54
    for line in q_output:
55
        if line.startswith(queue_name):
56
            new_line = re.sub(' +',' ',line).strip()
57
            try:
58
                max_tasks = int(new_line.split(' ')[4])
59
            except:
60
                pass
61
            pen_tasks   = int(new_line.split(' ')[8])
62
            run_tasks   = int(new_line.split(' ')[9])
63
            sus_tasks   = int(new_line.split(' ')[10])
64
65
66
    return {'available' : max_tasks-run_tasks, 'max tasks' : max_tasks}
67
68
def tasks_per_node(queue_id):
69
    host_list = None
70
    q_output = check_output(['bqueues','-l',queue_id]).splitlines()
71
    for line in q_output:
72
        if line.startswith('HOSTS:'):
73
            host_list = line.strip().rsplit(' ',1)[1].replace('/','')
74
            if host_list == 'none':
75
                return 0
76
    bhosts_output = check_output(['bhosts','-l',host_list]).splitlines()
77
    line = re.sub(' +',' ',bhosts_output[2]).strip()
78
    tasks = int(line.split(' ')[3])
79
80
    return tasks
81
82
def node_config(queue_id):
83
    # Find first node with queue and record node config
84
    #bqueues -l queue_id
85
    host_list = None
86
    config = {}
87
    q_output = check_output(['bqueues','-l',queue_id]).splitlines()
88
    for line in q_output:
89
        if line.startswith('HOSTS:'):
90
            host_list = line.strip().rsplit(' ',1)[1].replace('/','')
91
            if host_list == 'none':
92
                config['max task']   = 0
93
                config['max thread'] = 0
94
                config['max memory'] = 0
95
                return config
96
    bhosts_output = check_output(['bhosts','-l',host_list]).splitlines()
97
    line = re.sub(' +',' ',bhosts_output[2]).strip()
98
    tasks = int(line.split(' ')[3])
99
    line = re.sub(' +',' ',bhosts_output[6]).strip()
100
    memory = float(line.split(' ')[11].replace('G',''))
101
    config['max task']   = tasks
102
    config['max thread'] = tasks
103
    config['max memory'] = memory
104
105
    return config
106
107
108
def create_submit(queue_id,**kwargs):
109
    queue_name   = queue_id
110
    num_tasks = 1
111
    if 'num_tasks' in kwargs:
112
        num_tasks = kwargs['num_tasks']
113
114
    tpn = tasks_per_node(queue_id)
115
    queue_tpn = tpn
116
    if 'tasks_per_node' in kwargs:
117
        tpn = min(tpn,kwargs['tasks_per_node'])
118
119
    nc = node_config(queue_id)
120
    qc = available_tasks(queue_id)
121
122
    if qc['max tasks'] > 0:
123
        num_tasks = min(num_tasks,qc['max tasks'])
124
125
    num_threads_per_task = nc['max thread']
126
    if 'num_threads_per_task' in kwargs:
127
        num_threads_per_task = kwargs['num_threads_per_task']
128
    num_threads_per_task = min(num_threads_per_task,int(math.ceil(float(nc['max thread'])/float(tpn))))
129
130
    my_name = kwargs.get('my_name', "myclusterjob")
131
    my_output = kwargs.get('my_output', "myclusterjob.out")
132
    my_script = kwargs.get('my_script', None)
133
    if 'mycluster-' in my_script:
134
        my_script = get_data(my_script)
135
136
    user_email = kwargs.get('user_email', None)
137
    project_name = kwargs.get('project_name', 'default')
138
139
    wall_clock = kwargs.get('wall_clock', '12:00')
140
    if ':' not in wall_clock:
141
        wall_clock = wall_clock + ':00'
142
143
    num_nodes = int(math.ceil(float(num_tasks)/float(tpn)))
144
145
    num_queue_slots = num_nodes*queue_tpn
146
147
    no_syscribe = kwargs.get('no_syscribe', False)
148
149
    record_job = not no_syscribe
150
151
    openmpi_args = kwargs.get('openmpi_args', "-bysocket -bind-to-socket")
152
153
    qos = kwargs.get('qos', None)
154
155
    template = load_template('lsf.jinja')
156
157
    script_str = template.render(my_name = my_name,
158
                                 my_script = my_script,
159
                                 my_output = my_output,
160
                                 user_email = user_email,
161
                                 queue_name = queue_name,
162
                                 num_queue_slots = num_queue_slots,
163
                                 num_tasks = num_tasks,
164
                                 tpn = tpn,
165
                                 num_threads_per_task = num_threads_per_task,
166
                                 num_nodes = num_nodes,
167
                                 project_name =  project_name,
168
                                 wall_clock = wall_clock,
169
                                 record_job = record_job,
170
                                 openmpi_args =  openmpi_args,
171
                                 qos = qos)
172
173
    return script_str
174
175
176
def submit(script_name,immediate, depends=None):
177
    job_id = None
178
    with os.popen('bsub <'+script_name) as f:
179
        try:
180
            job_id = int(f.readline().split(' ')[1].replace('<','').replace('>',''))
181
        except:
182
            print f
183
        # Get job id and record in database
184
    return job_id
185
186
def delete(job_id):
187
    with os.popen('bkill '+job_id) as f:
188
        pass
189
190
def status():
191
    status_dict = {}
192
    with os.popen('bjobs -w') as f:
193
        try:
194
            f.readline(); # read header
195
            for line in f:
196
                new_line = re.sub(' +',' ',line.strip())
197
                job_id = int(new_line.split(' ')[0])
198
                state = new_line.split(' ')[2]
199
                if state == 'RUN':
200
                    status_dict[job_id] = 'r'
201
                else:
202
                    status_dict[job_id] = state
203
        except e:
204
            print e
205
206
    return status_dict
207
208
def job_stats(job_id):
209
    stats_dict = {}
210
    with os.popen('bacct -l '+str(job_id)) as f:
211
        try:
212
            line = f.readline();
213
            new_line = re.sub(' +',' ',line.strip())
214
            stats_dict['wallclock']  = new_line.split(' ')[0]
215
            stats_dict['cpu'] = new_line.split(' ')[1]
216
            stats_dict['queue'] = new_line.split(' ')[2]
217
            stats_dict['mem'] = '-'#float(new_line.split(' ')[4])*int(new_line.split(' ')[3])
218
        except:
219
            print('LSF: Error reading job stats')
220
221
    return stats_dict
222
223
def running_stats(job_id):
224
    stats_dict = {}
225
    with os.popen('bjobs -W '+str(job_id)) as f:
226
        try:
227
            line = f.readline();
228
            new_line = re.sub(' +',' ',line.strip())
229
            stats_dict['wallclock']  = new_line.split(' ')[0]
230
        except:
231
            pass
232
233
    with os.popen('bjobs -W '+str(job_id)) as f:
234
        try:
235
            line = f.readline();
236
            new_line = re.sub(' +',' ',line.strip())
237
            ntasks = int(new_line.split(' ')[2])
238
            stats_dict['mem']  = float(new_line.split(' ')[1])*ntasks
239
            stats_dict['cpu']  = float(new_line.split(' ')[0])*ntasks
240
        except:
241
            pass
242
243
    return stats_dict
244