Completed
Push — develop ( 622986...fb60cb )
by
unknown
01:40
created

create_submit()   C

Complexity

Conditions 7

Size

Total Lines 66

Duplication

Lines 0
Ratio 0 %

Importance

Changes 35
Bugs 10 Features 1
Metric Value
cc 7
c 35
b 10
f 1
dl 0
loc 66
rs 5.9414

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
2
import os
3
import re
4
import math
5
from string import Template
6
from subprocess import Popen, PIPE, check_output
7
from mycluster import get_data
8
from mycluster import load_template
9
10
"""
11
12
bjobs -u all -q emerald
13
bqueues -l emerald
14
15
"""
16
17
18
def scheduler_type():
19
    return 'lsf'
20
21
def name():
22
    lsid_output = check_output(['lsid']).splitlines()
23
    for line in lsid_output:
24
        if line.startswith('My cluster name is'):
25
            return line.rsplit(' ',1)[1].strip()
26
27
    return 'undefined'
28
29
def queues():
30
    queue_list = []
31
32
    with os.popen('bqueues -w -u `whoami`') as f:
33
        f.readline(); # read header
34
        for line in f:
35
            q = line.split(' ')[0].strip()
36
            queue_list.append(q)
37
38
    return queue_list
39
40
def available_tasks(queue_id):
41
42
    # split queue id into queue and parallel env
43
    # list free slots
44
    free_tasks = 0
45
    max_tasks = 0
46
    run_tasks = 0
47
    queue_name   = queue_id
48
    q_output = check_output(['bqueues',queue_name]).splitlines()
49
    for line in q_output:
50
        if line.startswith(queue_name):
51
            new_line = re.sub(' +',' ',line).strip()
52
            try:
53
                max_tasks = int(new_line.split(' ')[4])
54
            except:
55
                pass
56
            pen_tasks   = int(new_line.split(' ')[8])
57
            run_tasks   = int(new_line.split(' ')[9])
58
            sus_tasks   = int(new_line.split(' ')[10])
59
60
61
    return {'available' : max_tasks-run_tasks, 'max tasks' : max_tasks}
62
63
def tasks_per_node(queue_id):
64
    host_list = None
65
    q_output = check_output(['bqueues','-l',queue_id]).splitlines()
66
    for line in q_output:
67
        if line.startswith('HOSTS:'):
68
            host_list = line.strip().rsplit(' ',1)[1].replace('/','')
69
70
    bhosts_output = check_output(['bhosts','-l',host_list]).splitlines()
71
    line = re.sub(' +',' ',bhosts_output[2]).strip()
72
    tasks = int(line.split(' ')[3])
73
74
    return tasks
75
76
def node_config(queue_id):
77
    # Find first node with queue and record node config
78
    #bqueues -l queue_id
79
    host_list = None
80
    config = {}
81
    q_output = check_output(['bqueues','-l',queue_id]).splitlines()
82
    for line in q_output:
83
        if line.startswith('HOSTS:'):
84
            host_list = line.strip().rsplit(' ',1)[1].replace('/','')
85
86
    bhosts_output = check_output(['bhosts','-l',host_list]).splitlines()
87
    line = re.sub(' +',' ',bhosts_output[2]).strip()
88
    tasks = int(line.split(' ')[3])
89
    line = re.sub(' +',' ',bhosts_output[6]).strip()
90
    memory = int(line.split(' ')[11].replace('G',''))
91
    config['max task']   = tasks
92
    config['max thread'] = tasks
93
    config['max memory'] = memory
94
95
    return config
96
97
98
def create_submit(queue_id,**kwargs):
99
    queue_name   = queue_id
100
    num_tasks = 1
101
    if 'num_tasks' in kwargs:
102
        num_tasks = kwargs['num_tasks']
103
104
    tpn = tasks_per_node(queue_id)
105
    queue_tpn = tpn
106
    if 'tasks_per_node' in kwargs:
107
        tpn = min(tpn,kwargs['tasks_per_node'])
108
109
    nc = node_config(queue_id)
110
    qc = available_tasks(queue_id)
111
112
    if qc['max tasks'] > 0:
113
        num_tasks = min(num_tasks,qc['max tasks'])
114
115
    num_threads_per_task = nc['max thread']
116
    if 'num_threads_per_task' in kwargs:
117
        num_threads_per_task = kwargs['num_threads_per_task']
118
    num_threads_per_task = min(num_threads_per_task,int(math.ceil(float(nc['max thread'])/float(tpn))))
119
120
    my_name = kwargs.get('my_name', "myclusterjob")
121
    my_output = kwargs.get('my_output', "myclusterjob.out")
122
    my_script = kwargs.get('my_script', None)
123
    if 'mycluster-' in my_script:
124
        my_script = get_data(my_script)
125
126
    user_email = kwargs.get('user_email', None)
127
    project_name = kwargs.get('project_name', 'default')
128
129
    wall_clock = kwargs.get('wall_clock', '12:00')
130
    if ':' not in wall_clock:
131
        wall_clock = wall_clock + ':00'
132
133
    num_nodes = int(math.ceil(float(num_tasks)/float(tpn)))
134
135
    num_queue_slots = num_nodes*queue_tpn
136
137
    no_syscribe = kwargs.get('no_syscribe', False)
138
139
    record_job = not no_syscribe
140
141
    openmpi_args = kwargs.get('openmpi_args', "-bysocket -bind-to-socket")
142
143
    qos = kwargs.get('qos', None)
144
145
    template = load_template('lsf.jinja')
146
147
    script_str = template.render(my_name = my_name,
148
                                 my_script = my_script,
149
                                 my_output = my_output,
150
                                 user_email = user_email,
151
                                 queue_name = queue_name,
152
                                 num_queue_slots = num_queue_slots,
153
                                 num_tasks = num_tasks,
154
                                 tpn = tpn,
155
                                 num_threads_per_task = num_threads_per_task,
156
                                 num_nodes = num_nodes,
157
                                 project_name =  project_name,
158
                                 wall_clock = wall_clock,
159
                                 record_job = record_job,
160
                                 openmpi_args =  openmpi_args,
161
                                 qos = qos)
162
163
    return script_str
164
165
166
def submit(script_name,immediate, depends=None):
167
    job_id = None
168
    with os.popen('bsub <'+script_name) as f:
169
        try:
170
            job_id = int(f.readline().split(' ')[1].replace('<','').replace('>',''))
171
        except:
172
            print f
173
        # Get job id and record in database
174
    return job_id
175
176
def delete(job_id):
177
    with os.popen('bkill '+job_id) as f:
178
        pass
179
180
def status():
181
    status_dict = {}
182
    with os.popen('bjobs -w') as f:
183
        try:
184
            f.readline(); # read header
185
            for line in f:
186
                new_line = re.sub(' +',' ',line.strip())
187
                job_id = int(new_line.split(' ')[0])
188
                state = new_line.split(' ')[2]
189
                if state == 'RUN':
190
                    status_dict[job_id] = 'r'
191
                else:
192
                    status_dict[job_id] = state
193
        except e:
194
            print e
195
196
    return status_dict
197
198
def job_stats(job_id):
199
    stats_dict = {}
200
    with os.popen('bacct -l '+str(job_id)) as f:
201
        try:
202
            line = f.readline();
203
            new_line = re.sub(' +',' ',line.strip())
204
            stats_dict['wallclock']  = new_line.split(' ')[0]
205
            stats_dict['cpu'] = new_line.split(' ')[1]
206
            stats_dict['queue'] = new_line.split(' ')[2]
207
            stats_dict['mem'] = '-'#float(new_line.split(' ')[4])*int(new_line.split(' ')[3])
208
        except:
209
            print('LSF: Error reading job stats')
210
211
    return stats_dict
212
213
def running_stats(job_id):
214
    stats_dict = {}
215
    with os.popen('bjobs -W '+str(job_id)) as f:
216
        try:
217
            line = f.readline();
218
            new_line = re.sub(' +',' ',line.strip())
219
            stats_dict['wallclock']  = new_line.split(' ')[0]
220
        except:
221
            pass
222
223
    with os.popen('bjobs -W '+str(job_id)) as f:
224
        try:
225
            line = f.readline();
226
            new_line = re.sub(' +',' ',line.strip())
227
            ntasks = int(new_line.split(' ')[2])
228
            stats_dict['mem']  = float(new_line.split(' ')[1])*ntasks
229
            stats_dict['cpu']  = float(new_line.split(' ')[0])*ntasks
230
        except:
231
            pass
232
233
    return stats_dict
234