Completed
Push — develop ( 9f9e04...098af0 )
by James
36s
created

create_submit()   C

Complexity

Conditions 7

Size

Total Lines 67

Duplication

Lines 0
Ratio 0 %

Importance

Changes 30
Bugs 7 Features 1
Metric Value
cc 7
c 30
b 7
f 1
dl 0
loc 67
rs 5.8743

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
2
import os
3
import re
4
import math
5
from string import Template
6
from datetime import timedelta
7
from subprocess import Popen, PIPE, check_output
8
from .mycluster import get_data
9
from .mycluster import load_template
10
11
"""
12
13
bjobs -u all -q emerald
14
bqueues -l emerald
15
16
"""
17
18
19
def scheduler_type():
20
    return 'lsf'
21
22
23
def name():
24
    lsid_output = check_output(['lsid']).splitlines()
25
    for line in lsid_output:
26
        if line.startswith('My cluster name is'):
27
            return line.rsplit(' ', 1)[1].strip()
28
29
    return 'undefined'
30
31
32
def queues():
33
    queue_list = []
34
35
    with os.popen('bqueues -w -u `whoami`') as f:
36
        f.readline()  # read header
37
        for line in f:
38
            q = line.split(' ')[0].strip()
39
            queue_list.append(q)
40
41
    return queue_list
42
43
44
def accounts():
45
    return []
46
47
48
def available_tasks(queue_id):
49
50
    # split queue id into queue and parallel env
51
    # list free slots
52
    free_tasks = 0
53
    max_tasks = 0
54
    run_tasks = 0
55
    queue_name = queue_id
56
    q_output = check_output(['bqueues', queue_name]).splitlines()
57
    for line in q_output:
58
        if line.startswith(queue_name):
59
            new_line = re.sub(' +', ' ', line).strip()
60
            try:
61
                max_tasks = int(new_line.split(' ')[4])
62
            except:
63
                pass
64
            pen_tasks = int(new_line.split(' ')[8])
65
            run_tasks = int(new_line.split(' ')[9])
66
            sus_tasks = int(new_line.split(' ')[10])
67
68
    return {'available': max_tasks - run_tasks, 'max tasks': max_tasks}
69
70
71
def tasks_per_node(queue_id):
72
    host_list = None
73
    q_output = check_output(['bqueues', '-l', queue_id]).splitlines()
74
    for line in q_output:
75
        if line.startswith('HOSTS:'):
76
            host_list = line.strip().rsplit(' ', 1)[1].replace('/', '')
77
            if host_list == 'none':
78
                return 0
79
    bhosts_output = check_output(['bhosts', '-l', host_list]).splitlines()
80
    line = re.sub(' +', ' ', bhosts_output[2]).strip()
81
    tasks = int(line.split(' ')[3])
82
83
    return tasks
84
85
86
def node_config(queue_id):
87
    # Find first node with queue and record node config
88
    # bqueues -l queue_id
89
    host_list = None
90
    config = {}
91
    q_output = check_output(['bqueues', '-l', queue_id]).splitlines()
92
    for line in q_output:
93
        if line.startswith('HOSTS:'):
94
            host_list = line.strip().rsplit(' ', 1)[1].replace('/', '')
95
            if host_list == 'none':
96
                config['max task'] = 0
97
                config['max thread'] = 0
98
                config['max memory'] = 0
99
                return config
100
    bhosts_output = check_output(['bhosts', '-l', host_list]).splitlines()
101
    line = re.sub(' +', ' ', bhosts_output[2]).strip()
102
    tasks = int(line.split(' ')[3])
103
    line = re.sub(' +', ' ', bhosts_output[6]).strip()
104
    memory = float(line.split(' ')[11].replace('G', ''))
105
    config['max task'] = tasks
106
    config['max thread'] = tasks
107
    config['max memory'] = memory
108
109
    return config
110
111
112
def create_submit(queue_id, **kwargs):
113
    queue_name = queue_id
114
    num_tasks = 1
115
    if 'num_tasks' in kwargs:
116
        num_tasks = kwargs['num_tasks']
117
118
    tpn = tasks_per_node(queue_id)
119
    queue_tpn = tpn
120
    if 'tasks_per_node' in kwargs:
121
        tpn = min(tpn, kwargs['tasks_per_node'])
122
123
    nc = node_config(queue_id)
124
    qc = available_tasks(queue_id)
125
126
    if qc['max tasks'] > 0:
127
        num_tasks = min(num_tasks, qc['max tasks'])
128
129
    num_threads_per_task = nc['max thread']
130
    if 'num_threads_per_task' in kwargs:
131
        num_threads_per_task = kwargs['num_threads_per_task']
132
    num_threads_per_task = min(num_threads_per_task, int(
133
        math.ceil(float(nc['max thread']) / float(tpn))))
134
135
    my_name = kwargs.get('my_name', "myclusterjob")
136
    my_output = kwargs.get('my_output', "myclusterjob.out")
137
    my_script = kwargs.get('my_script', None)
138
    if 'mycluster-' in my_script:
139
        my_script = get_data(my_script)
140
141
    user_email = kwargs.get('user_email', None)
142
    project_name = kwargs.get('project_name', 'default')
143
144
    wall_clock = kwargs.get('wall_clock', '12:00')
145
    if ':' not in wall_clock:
146
        wall_clock = wall_clock + ':00'
147
148
    num_nodes = int(math.ceil(float(num_tasks) / float(tpn)))
149
150
    num_queue_slots = num_nodes * queue_tpn
151
152
    no_syscribe = kwargs.get('no_syscribe', False)
153
154
    record_job = not no_syscribe
155
156
    openmpi_args = kwargs.get('openmpi_args', "-bysocket -bind-to-socket")
157
158
    qos = kwargs.get('qos', None)
159
160
    template = load_template('lsf.jinja')
161
162
    script_str = template.render(my_name=my_name,
163
                                 my_script=my_script,
164
                                 my_output=my_output,
165
                                 user_email=user_email,
166
                                 queue_name=queue_name,
167
                                 num_queue_slots=num_queue_slots,
168
                                 num_tasks=num_tasks,
169
                                 tpn=tpn,
170
                                 num_threads_per_task=num_threads_per_task,
171
                                 num_nodes=num_nodes,
172
                                 project_name=project_name,
173
                                 wall_clock=wall_clock,
174
                                 record_job=record_job,
175
                                 openmpi_args=openmpi_args,
176
                                 qos=qos)
177
178
    return script_str
179
180
181
def submit(script_name, immediate, depends_on=None,
182
           depends_on_always_run=False):
183
    job_id = None
184
185 View Code Duplication
    if depends_on and depends_on_always_run:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
186
        with os.popen('bsub -w "ended(%s)" < %s ' % (depends_on, script_name)) as f:
187
            output = f.readline()
188
            try:
189
                job_id = int(output.split(' ')[1].replace(
190
                    '<', '').replace('>', ''))
191
            except:
192
                print 'Job submission failed: ' + output
193
    elif depends_on is not None:
194
        with os.popen('bsub -w "done(%s)" < %s ' % (depends_on, script_name)) as f:
195
            output = f.readline()
196
            try:
197
                job_id = int(output.split(' ')[1].replace(
198
                    '<', '').replace('>', ''))
199
            except:
200
                print 'Job submission failed: ' + output
201
    else:
202
        with os.popen('bsub <' + script_name) as f:
203
            output = f.readline()
204
            try:
205
                job_id = int(output.split(' ')[1].replace(
206
                    '<', '').replace('>', ''))
207
            except:
208
                print 'Job submission failed: ' + output
209
    return job_id
210
211
212
def delete(job_id):
213
    with os.popen('bkill ' + job_id) as f:
214
        pass
215
216
217 View Code Duplication
def status():
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
218
    status_dict = {}
219
    with os.popen('bjobs -w') as f:
220
        try:
221
            f.readline()  # read header
222
            for line in f:
223
                new_line = re.sub(' +', ' ', line.strip())
224
                job_id = int(new_line.split(' ')[0])
225
                state = new_line.split(' ')[2]
226
                if state == 'RUN':
227
                    status_dict[job_id] = 'r'
228
                else:
229
                    status_dict[job_id] = state
230
        except e:
231
            print e
232
233
    return status_dict
234
235
236
def job_stats(job_id):
237
    stats_dict = {}
238
    with os.popen('bacct -l ' + str(job_id)) as f:
239
        try:
240
            line = f.readline()
241
            new_line = re.sub(' +', ' ', line.strip())
242
            stats_dict['wallclock'] = new_line.split(' ')[0]
243
            stats_dict['cpu'] = new_line.split(' ')[1]
244
            stats_dict['queue'] = new_line.split(' ')[2]
245
            # float(new_line.split(' ')[4])*int(new_line.split(' ')[3])
246
            stats_dict['mem'] = '-'
247
        except:
248
            print('LSF: Error reading job stats')
249
250
    return stats_dict
251
252
253
def job_stats_enhanced(job_id):
254
    """
255
    Get full job and step stats for job_id
256
    """
257
    stats_dict = {}
258
    with os.popen('bjobs -o "jobid run_time cpu_used  queue slots  stat exit_code start_time estimated_start_time finish_time delimiter=\'|\'" -noheader ' + str(job_id)) as f:
259
        try:
260
            line = f.readline()
261
            cols = line.split('|')
262
            stats_dict['job_id'] = cols[0]
263
            if cols[1] != '-':
264
                stats_dict['wallclock'] = timedelta(
265
                    seconds=float(cols[1].split(' ')[0]))
266
            if cols[2] != '-':
267
                stats_dict['cpu'] = timedelta(
268
                    seconds=float(cols[2].split(' ')[0]))
269
            stats_dict['queue'] = cols[3]
270
            stats_dict['status'] = cols[5]
271
            stats_dict['exit_code'] = cols[6]
272
            stats_dict['start'] = cols[7]
273
            stats_dict['start_time'] = cols[8]
274
            if stats_dict['status'] in ['DONE', 'EXIT']:
275
                stats_dict['end'] = cols[9]
276
277
            steps = []
278
            stats_dict['steps'] = steps
279
        except:
280
            with os.popen('bhist -l ' + str(job_id)) as f:
281
                try:
282
                    output = f.readlines()
283
                    for line in output:
284
                        if "Done successfully" in line:
285
                            stats_dict['status'] = 'DONE'
286
                            return stats_dict
287
                        elif "Completed <exit>" in line:
288
                            stats_dict['status'] = 'EXIT'
289
                            return stats_dict
290
                        else:
291
                            stats_dict['status'] = 'UNKNOWN'
292
                except Exception as e:
293
                    print(e)
294
                    print('LSF: Error reading job stats')
295
                    stats_dict['status'] = 'UNKNOWN'
296
    return stats_dict
297
298
299
def running_stats(job_id):
300
    stats_dict = {}
301
    with os.popen('bjobs -W ' + str(job_id)) as f:
302
        try:
303
            line = f.readline()
304
            new_line = re.sub(' +', ' ', line.strip())
305
            stats_dict['wallclock'] = new_line.split(' ')[0]
306
        except:
307
            pass
308
309
    with os.popen('bjobs -W ' + str(job_id)) as f:
310
        try:
311
            line = f.readline()
312
            new_line = re.sub(' +', ' ', line.strip())
313
            ntasks = int(new_line.split(' ')[2])
314
            stats_dict['mem'] = float(new_line.split(' ')[1]) * ntasks
315
            stats_dict['cpu'] = float(new_line.split(' ')[0]) * ntasks
316
        except:
317
            pass
318
319
    return stats_dict
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
320