Completed
Push — master ( aa4f83...2f220b )
by James
10s
created

create_submit()   C

Complexity

Conditions 7

Size

Total Lines 67

Duplication

Lines 0
Ratio 0 %

Importance

Changes 27
Bugs 6 Features 1
Metric Value
cc 7
c 27
b 6
f 1
dl 0
loc 67
rs 5.8743

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
2
import os
3
import re
4
import math
5
from string import Template
6
from datetime import timedelta
7
from subprocess import Popen, PIPE, check_output
8
from .mycluster import get_data
9
from .mycluster import load_template
10
11
"""
12
13
bjobs -u all -q emerald
14
bqueues -l emerald
15
16
"""
17
18
19
def scheduler_type():
20
    return 'lsf'
21
22
23
def name():
24
    lsid_output = check_output(['lsid']).splitlines()
25
    for line in lsid_output:
26
        if line.startswith('My cluster name is'):
27
            return line.rsplit(' ', 1)[1].strip()
28
29
    return 'undefined'
30
31
32
def queues():
33
    queue_list = []
34
35
    with os.popen('bqueues -w -u `whoami`') as f:
36
        f.readline()  # read header
37
        for line in f:
38
            q = line.split(' ')[0].strip()
39
            queue_list.append(q)
40
41
    return queue_list
42
43
44
def accounts():
45
    return []
46
47
48
def available_tasks(queue_id):
49
50
    # split queue id into queue and parallel env
51
    # list free slots
52
    free_tasks = 0
53
    max_tasks = 0
54
    run_tasks = 0
55
    queue_name = queue_id
56
    q_output = check_output(['bqueues', queue_name]).splitlines()
57
    for line in q_output:
58
        if line.startswith(queue_name):
59
            new_line = re.sub(' +', ' ', line).strip()
60
            try:
61
                max_tasks = int(new_line.split(' ')[4])
62
            except:
63
                pass
64
            pen_tasks = int(new_line.split(' ')[8])
65
            run_tasks = int(new_line.split(' ')[9])
66
            sus_tasks = int(new_line.split(' ')[10])
67
68
    return {'available': max_tasks - run_tasks, 'max tasks': max_tasks}
69
70
71
def tasks_per_node(queue_id):
72
    host_list = None
73
    q_output = check_output(['bqueues', '-l', queue_id]).splitlines()
74
    for line in q_output:
75
        if line.startswith('HOSTS:'):
76
            host_list = line.strip().rsplit(' ', 1)[1].replace('/', '')
77
            if host_list == 'none':
78
                return 0
79
    bhosts_output = check_output(['bhosts', '-l', host_list]).splitlines()
80
    line = re.sub(' +', ' ', bhosts_output[2]).strip()
81
    tasks = int(line.split(' ')[3])
82
83
    return tasks
84
85
86
def node_config(queue_id):
87
    # Find first node with queue and record node config
88
    # bqueues -l queue_id
89
    host_list = None
90
    config = {}
91
    q_output = check_output(['bqueues', '-l', queue_id]).splitlines()
92
    for line in q_output:
93
        if line.startswith('HOSTS:'):
94
            host_list = line.strip().rsplit(' ', 1)[1].replace('/', '')
95
            if host_list == 'none':
96
                config['max task'] = 0
97
                config['max thread'] = 0
98
                config['max memory'] = 0
99
                return config
100
    bhosts_output = check_output(['bhosts', '-l', host_list]).splitlines()
101
    line = re.sub(' +', ' ', bhosts_output[2]).strip()
102
    tasks = int(line.split(' ')[3])
103
    line = re.sub(' +', ' ', bhosts_output[6]).strip()
104
    memory = float(line.split(' ')[11].replace('G', ''))
105
    config['max task'] = tasks
106
    config['max thread'] = tasks
107
    config['max memory'] = memory
108
109
    return config
110
111
112
def create_submit(queue_id, **kwargs):
113
    queue_name = queue_id
114
    num_tasks = 1
115
    if 'num_tasks' in kwargs:
116
        num_tasks = kwargs['num_tasks']
117
118
    tpn = tasks_per_node(queue_id)
119
    queue_tpn = tpn
120
    if 'tasks_per_node' in kwargs:
121
        tpn = min(tpn, kwargs['tasks_per_node'])
122
123
    nc = node_config(queue_id)
124
    qc = available_tasks(queue_id)
125
126
    if qc['max tasks'] > 0:
127
        num_tasks = min(num_tasks, qc['max tasks'])
128
129
    num_threads_per_task = nc['max thread']
130
    if 'num_threads_per_task' in kwargs:
131
        num_threads_per_task = kwargs['num_threads_per_task']
132
    num_threads_per_task = min(num_threads_per_task, int(
133
        math.ceil(float(nc['max thread']) / float(tpn))))
134
135
    my_name = kwargs.get('my_name', "myclusterjob")
136
    my_output = kwargs.get('my_output', "myclusterjob.out")
137
    my_script = kwargs.get('my_script', None)
138
    if 'mycluster-' in my_script:
139
        my_script = get_data(my_script)
140
141
    user_email = kwargs.get('user_email', None)
142
    project_name = kwargs.get('project_name', 'default')
143
144
    wall_clock = kwargs.get('wall_clock', '12:00')
145
    if ':' not in wall_clock:
146
        wall_clock = wall_clock + ':00'
147
148
    num_nodes = int(math.ceil(float(num_tasks) / float(tpn)))
149
150
    num_queue_slots = num_nodes * queue_tpn
151
152
    no_syscribe = kwargs.get('no_syscribe', False)
153
154
    record_job = not no_syscribe
155
156
    openmpi_args = kwargs.get('openmpi_args', "-bysocket -bind-to-socket")
157
158
    qos = kwargs.get('qos', None)
159
160
    template = load_template('lsf.jinja')
161
162
    script_str = template.render(my_name=my_name,
163
                                 my_script=my_script,
164
                                 my_output=my_output,
165
                                 user_email=user_email,
166
                                 queue_name=queue_name,
167
                                 num_queue_slots=num_queue_slots,
168
                                 num_tasks=num_tasks,
169
                                 tpn=tpn,
170
                                 num_threads_per_task=num_threads_per_task,
171
                                 num_nodes=num_nodes,
172
                                 project_name=project_name,
173
                                 wall_clock=wall_clock,
174
                                 record_job=record_job,
175
                                 openmpi_args=openmpi_args,
176
                                 qos=qos)
177
178
    return script_str
179
180
181
def submit(script_name, immediate, depends_on=None,
182
           depends_on_always_run=False):
183
    job_id = None
184
185 View Code Duplication
    if depends_on and depends_on_always_run:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
186
        cmd = 'bsub -w "ended(%s)" < %s ' % (depends_on, script_name)
187
        with os.popen(cmd) as f:
188
            output = f.readline()
189
            try:
190
                job_id = int(output.split(' ')[1].replace(
191
                    '<', '').replace('>', ''))
192
            except:
193
                print 'Job submission failed: ' + output
194
    elif depends_on is not None:
195
        cmd = 'bsub -w "done(%s)" < %s ' % (depends_on, script_name)
196
        with os.popen(cmd) as f:
197
            output = f.readline()
198
            try:
199
                job_id = int(output.split(' ')[1].replace(
200
                    '<', '').replace('>', ''))
201
            except:
202
                print 'Job submission failed: ' + output
203
    else:
204
        with os.popen('bsub <' + script_name) as f:
205
            output = f.readline()
206
            try:
207
                job_id = int(output.split(' ')[1].replace(
208
                    '<', '').replace('>', ''))
209
            except:
210
                print 'Job submission failed: ' + output
211
    return job_id
212
213
214
def delete(job_id):
215
    with os.popen('bkill ' + job_id) as f:
216
        pass
217
218
219 View Code Duplication
def status():
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
220
    status_dict = {}
221
    with os.popen('bjobs -w') as f:
222
        try:
223
            f.readline()  # read header
224
            for line in f:
225
                new_line = re.sub(' +', ' ', line.strip())
226
                job_id = int(new_line.split(' ')[0])
227
                state = new_line.split(' ')[2]
228
                if state == 'RUN':
229
                    status_dict[job_id] = 'r'
230
                else:
231
                    status_dict[job_id] = state
232
        except e:
233
            print e
234
235
    return status_dict
236
237
238
def job_stats(job_id):
239
    stats_dict = {}
240
    with os.popen('bacct -l ' + str(job_id)) as f:
241
        try:
242
            line = f.readline()
243
            new_line = re.sub(' +', ' ', line.strip())
244
            stats_dict['wallclock'] = new_line.split(' ')[0]
245
            stats_dict['cpu'] = new_line.split(' ')[1]
246
            stats_dict['queue'] = new_line.split(' ')[2]
247
            # float(new_line.split(' ')[4])*int(new_line.split(' ')[3])
248
            stats_dict['mem'] = '-'
249
        except:
250
            print('LSF: Error reading job stats')
251
252
    return stats_dict
253
254
255
def job_stats_enhanced(job_id):
256
    """
257
    Get full job and step stats for job_id
258
    """
259
    stats_dict = {}
260
    with os.popen('bjobs -o "jobid run_time cpu_used  queue slots  stat exit_code start_time estimated_start_time finish_time delimiter=\'|\'" -noheader ' + str(job_id)) as f:
261
        try:
262
            line = f.readline()
263
            cols = line.split('|')
264
            stats_dict['job_id'] = cols[0]
265
            if cols[1] != '-':
266
                stats_dict['wallclock'] = timedelta(
267
                    seconds=float(cols[1].split(' ')[0]))
268
            if cols[2] != '-':
269
                stats_dict['cpu'] = timedelta(
270
                    seconds=float(cols[2].split(' ')[0]))
271
            stats_dict['queue'] = cols[3]
272
            stats_dict['status'] = cols[5]
273
            stats_dict['exit_code'] = cols[6]
274
            stats_dict['start'] = cols[7]
275
            stats_dict['start_time'] = cols[8]
276
            if stats_dict['status'] in ['DONE', 'EXIT']:
277
                stats_dict['end'] = cols[9]
278
279
            steps = []
280
            stats_dict['steps'] = steps
281
        except:
282
            with os.popen('bhist -l ' + str(job_id)) as f:
283
                try:
284
                    output = f.readlines()
285
                    for line in output:
286
                        if "Done successfully" in line:
287
                            stats_dict['status'] = 'DONE'
288
                            return stats_dict
289
                        elif "Completed <exit>" in line:
290
                            stats_dict['status'] = 'EXIT'
291
                            return stats_dict
292
                        else:
293
                            stats_dict['status'] = 'UNKNOWN'
294
                except Exception as e:
295
                    print(e)
296
                    print('LSF: Error reading job stats')
297
                    stats_dict['status'] = 'UNKNOWN'
298
    return stats_dict
299
300
301 View Code Duplication
def running_stats(job_id):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
302
    stats_dict = {}
303
    with os.popen('bjobs -W ' + str(job_id)) as f:
304
        try:
305
            line = f.readline()
306
            new_line = re.sub(' +', ' ', line.strip())
307
            stats_dict['wallclock'] = new_line.split(' ')[0]
308
        except:
309
            pass
310
311
    with os.popen('bjobs -W ' + str(job_id)) as f:
312
        try:
313
            line = f.readline()
314
            new_line = re.sub(' +', ' ', line.strip())
315
            ntasks = int(new_line.split(' ')[2])
316
            stats_dict['mem'] = float(new_line.split(' ')[1]) * ntasks
317
            stats_dict['cpu'] = float(new_line.split(' ')[0]) * ntasks
318
        except:
319
            pass
320
321
    return stats_dict
322