Completed
Push — develop ( 68eb6e...337dbe )
by
unknown
01:18
created

status()   A

Complexity

Conditions 4

Size

Total Lines 16

Duplication

Lines 0
Ratio 0 %

Importance

Changes 11
Bugs 3 Features 0
Metric Value
cc 4
c 11
b 3
f 0
dl 0
loc 16
rs 9.2
1
import os
2
import re
3
import math
4
from string import Template
5
from mycluster import get_data
6
from mycluster import load_template
7
8
""""
9
SGE notes
10
11
list PARALLEL_ENV: qconf -spl
12
details: qconf -sp $PARALLEL_ENV
13
14
List avail resources: qstat -pe $PARALLEL_ENV -g c
15
16
submit: qsub -pe $PARALLEL_ENV $NUM_SLOTS
17
18
delete: qdel job-id
19
20
checks: qalter -w p job-id
21
        qalter -w v job-id
22
23
        qconf -shgrp_resolved @el6nodes
24
25
list hosts qhost -q
26
27
Useful sites:
28
https://confluence.rcs.griffith.edu.au/display/v20zCluster/SGE+cheat+sheet
29
http://www.uibk.ac.at/zid/systeme/hpc-systeme/common/tutorials/sge-howto.html
30
http://www-zeuthen.desy.de/dv/documentation/unixguide/chapter18.html
31
http://psg.skinforum.org/lsf.html
32
"""
33
34
def scheduler_type():
35
    return 'sge'
36
37
def name():
38
    return os.getenv('SGE_CLUSTER_NAME')
39
40
def queues():
41
42
    # list all parallel env
43
    # for parallel_env list queues associated
44
    # Find first node with queue and record node config
45
46
    queue_list = []
47
    parallel_env_list = []
48
49
    with os.popen('qconf -spl') as f:
50
        for line in f:
51
            parallel_env_list.append(line.strip())
52
53
    for parallel_env in parallel_env_list:
54
        with os.popen('qstat -pe '+parallel_env+' -U `whoami` -g c') as f:
55
            f.readline(); # read header
56
            f.readline(); # read separator
57
            for line in f:
58
                queue_name = line.split(' ')[0].strip()
59
                # Check if user has permission to use queue
60
                with os.popen('qstat -g c -U `whoami` -q '+queue_name) as f2:
61
                    try:
62
                        f2.readline()
63
                        f2.readline()
64
                        if len(f2.readline()):
65
                            queue_list.append(parallel_env+':'+queue_name)
66
                    except:
67
                        pass
68
69
    return queue_list
70
71
72
def accounts():
73
    return []
74
75
76
def available_tasks(queue_id):
77
78
    # split queue id into queue and parallel env
79
    # list free slots
80
    free_tasks = 0
81
    max_tasks = 0
82
    parallel_env = queue_id.split(':')[0]
83
    queue_name   = queue_id.split(':')[1]
84
    with os.popen(' qstat -pe '+parallel_env+' -U `whoami` -g c') as f:
85
        f.readline(); # read header
86
        f.readline(); # read separator
87
        for line in f:
88
            # remove multiple white space
89
            new_line = re.sub(' +',' ',line)
90
            qn = new_line.split(' ')[0]
91
            if qn == queue_name:
92
                free_tasks = int(new_line.split(' ')[4])
93
                max_tasks = int(new_line.split(' ')[5])
94
95
    return {'available' : free_tasks, 'max tasks' : max_tasks}
96
97
def tasks_per_node(queue_id):
98
    parallel_env = queue_id.split(':')[0]
99
    queue_name   = queue_id.split(':')[1]
100
    tasks=0
101
    with os.popen('qconf -sq '+queue_name) as f:
102
        for line in f:
103
            if line.split(' ')[0] == 'slots':
104
                tasks = int(re.split('\W+', line)[1])
105
106
    pe_tasks = tasks
107
    with os.popen('qconf -sp '+parallel_env) as f:
108
        try:
109
            for line in f:
110
                if line.split(' ')[0] == 'allocation_rule':
111
                    # This may throw exception as allocation rule
112
                    # may not always be an integer
113
                    pe_tasks = int(re.split('\W+', line)[1])
114
        except:
115
            pass
116
117
    return min(tasks,pe_tasks)
118
119
def min_tasks_per_node(queue_id):
120
    """
121
    This function is used when requesting non exclusive use
122
    as the parallel environment might enforce a minimum number
123
    of tasks
124
    """
125
    parallel_env = queue_id.split(':')[0]
126
    queue_name   = queue_id.split(':')[1]
127
    tasks=1
128
    pe_tasks = tasks
129
    with os.popen('qconf -sp '+parallel_env) as f:
130
        try:
131
            for line in f:
132
                if line.split(' ')[0] == 'allocation_rule':
133
                    # This may throw exception as allocation rule
134
                    # may not always be an integer
135
                    pe_tasks = int(re.split('\W+', line)[1])
136
        except:
137
            pass
138
139
    return max(tasks,pe_tasks)
140
141
def node_config(queue_id):
142
    # Find first node with queue and record node config
143
    parallel_env = queue_id.split(':')[0]
144
    queue_name   = queue_id.split(':')[1]
145
    host_group=0
146
    with os.popen('qconf -sq '+queue_name) as f:
147
        for line in f:
148
            if line.split(' ')[0] == 'hostlist':
149
                new_line = re.sub(' +',' ',line)
150
                host_group = new_line.split(' ')[1]
151
152
    config = {}
153
    host_name = ''
154
    found = False
155
    if host_group[0] is '@':
156
        #Is a host group
157
        with os.popen('qconf -shgrp_resolved '+host_group) as f:
158
            for line in f:
159
                for host_name in line.split(' '):
160
                    with os.popen('qhost -q -h '+host_name) as f:
161
                        header = f.readline(); # read header
162
                        f.readline(); # read separator
163
                        new_header = re.sub(' +',' ',header).strip()
164
                        if (new_header.split(' ')[3]) == 'LOAD': #sge <=6.2u4 style
165
                            for line in f:
166
                                if line[0] != ' ':
167
                                    name = line.split(' ')[0]
168
                                    if name != 'global':
169
                                        new_line = re.sub(' +',' ',line).strip()
170
                                        if new_line.split(' ')[3] != '-':
171
                                            config['max task']   = int(new_line.split(' ')[2])
172
                                            config['max thread'] = int(new_line.split(' ')[2])
173
                                            config['max memory'] =     new_line.split(' ')[4]
174
                                            found = True
175
                                            break
176
                        else:
177
                            for line in f:
178
                                if line[0] != ' ':
179
                                    name = line.split(' ')[0]
180
                                    if name != 'global':
181
                                        new_line = re.sub(' +',' ',line).strip()
182
                                        if new_line.split(' ')[3] != '-':
183
                                            config['max task']   = int(new_line.split(' ')[4])
184
                                            config['max thread'] = int(new_line.split(' ')[5])
185
                                            config['max memory'] =     new_line.split(' ')[7]
186
                                            found = True
187
                                            break
188
                    if found: break
189
    else:
190
        #Is a host
191
        host_name = host_group
192
        with os.popen('qhost -q -h '+host_name) as f:
193
            header = f.readline(); # read header
194
            f.readline(); # read separator
195
            new_header = re.sub(' +',' ',header).strip()
196
            if (new_header.split(' ')[3]) == 'LOAD': #sge <=6.2u4 style
197
                for line in f:
198
                    if line[0] != ' ':
199
                        name = line.split(' ')[0]
200 View Code Duplication
                        if name != 'global':
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
201
                            new_line = re.sub(' +',' ',line).strip()
202
                            if new_line.split(' ')[3] != '-':
203
                                config['max task']   = int(new_line.split(' ')[2])
204
                                config['max thread'] = int(new_line.split(' ')[2])
205
                                config['max memory'] =     new_line.split(' ')[4]
206
                            else:
207
                                config['max task']   = 0
208
                                config['max thread'] = 0
209
                                config['max memory'] = 0
210
            else:
211
                for line in f:
212
                    if line[0] != ' ':
213
                        name = line.split(' ')[0]
214 View Code Duplication
                        if name != 'global':
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
215
                            new_line = re.sub(' +',' ',line).strip()
216
                            if new_line.split(' ')[3] != '-':
217
                                config['max task']   = int(new_line.split(' ')[4])
218
                                config['max thread'] = int(new_line.split(' ')[5])
219
                                config['max memory'] =     new_line.split(' ')[7]
220
                            else:
221
                                config['max task']   = 0
222
                                config['max thread'] = 0
223
                                config['max memory'] = 0
224
    return config
225
226
def create_submit(queue_id,**kwargs):
227
228
    parallel_env = queue_id.split(':')[0]
229
    queue_name   = queue_id.split(':')[1]
230
231
    num_tasks = 1
232
    if 'num_tasks' in kwargs:
233
        num_tasks = kwargs['num_tasks']
234
235
    tpn = tasks_per_node(queue_id)
236
    queue_tpn = tpn
237
    if 'tasks_per_node' in kwargs:
238
        tpn = min(tpn,kwargs['tasks_per_node'])
239
240
    nc = node_config(queue_id)
241
    qc = available_tasks(queue_id)
242
243
    num_tasks = min(num_tasks,qc['max tasks'])
244
245
    num_threads_per_task = nc['max thread']
246
    if 'num_threads_per_task' in kwargs:
247
        num_threads_per_task = kwargs['num_threads_per_task']
248
    num_threads_per_task = min(num_threads_per_task,int(math.ceil(float(nc['max thread'])/float(tpn))))
249
250
    my_name = kwargs.get('my_name', "myclusterjob")
251
    my_output = kwargs.get('my_output', "myclusterjob.out")
252
    my_script = kwargs.get('my_script', None)
253
254
    if 'mycluster-' in my_script:
255
        my_script = get_data(my_script)
256
257
    user_email = kwargs.get('user_email', None)
258
    project_name = kwargs.get('project_name', 'default')
259
260
    wall_clock = kwargs.get('wall_clock', '12:00:00')
261
    if ':' not in wall_clock:
262
        wall_clock = wall_clock + ':00:00'
263
    
264
    num_nodes = int(math.ceil(float(num_tasks)/float(tpn)))
265
266
    # For exclusive node use total number of slots required
267
    # is number of nodes x number of slots offer by queue
268
    num_queue_slots = num_nodes*queue_tpn
269
    if 'shared' in kwargs:
270
        if kwargs['shared'] and num_nodes == 1: # Assumes fill up rule
271
            num_queue_slots = num_nodes*max(tpn,min_tasks_per_node(queue_id))
272
273
    no_syscribe = kwargs.get('no_syscribe', False)
274
275
    record_job = not no_syscribe
276
  
277
    openmpi_args = kwargs.get('openmpi_args', "-bysocket -bind-to-socket")
278
279
    qos = kwargs.get('qos', None)
280
 
281
    template = load_template('sge.jinja')
282
283
    script_str = template.render(my_name = my_name,
284
                                 my_script = my_script,
285
                                 my_output = my_output,
286
                                 user_email = user_email,
287
                                 queue_name = queue_name,
288
                                 parallel_env = parallel_env,
289
                                 num_queue_slots = num_queue_slots,
290
                                 num_tasks = num_tasks,
291
                                 tpn = tpn,
292
                                 num_threads_per_task = num_threads_per_task,
293
                                 num_nodes = num_nodes,
294
                                 project_name =  project_name,
295
                                 wall_clock = wall_clock,
296
                                 record_job = record_job,
297
                                 openmpi_args =  openmpi_args,
298
                                 qos = qos)
299
300
    return script_str
301
302
303
def submit(script_name, immediate, depends=None):
304
    job_id = None
305
    with os.popen('qsub -V -terse '+script_name) as f:
306
        job_id = 0
307
        try:
308
            job_id = int(f.readline().strip())
309
        except:
310
            print 'job id not returned'
311
            print f.readline()
312
            pass
313
        # Get job id and record in database
314
    return job_id
315
316
def delete(job_id):
317
    with os.popen('qdel '+job_id) as f:
318
        pass
319
320
def status():
321
    status_dict = {}
322
    with os.popen('qstat') as f:
323
        try:
324
            f.readline(); # read header
325
            f.readline(); # read separator
326
            for line in f:
327
                new_line = re.sub(' +',' ',line.strip())
328
                job_id = int(new_line.split(' ')[0])
329
                state = new_line.split(' ')[4]
330
331
                status_dict[job_id] = state
332
        except e:
333
            print e
334
335
    return status_dict
336
337
def job_stats(job_id):
338
    stats_dict = {}
339
    output={}
340
    with os.popen('qacct -j '+str(job_id)) as f:
341
        try:
342
            f.readline(); # read header
343
            for line in f:
344
                new_line = re.sub(' +',' ',line.strip())
345
                output[new_line.split(' ')[0]] = new_line.split(' ',1)[1]
346
        except:
347
            pass
348
    import datetime
349
    from mycluster import print_timedelta
350
    stats_dict['wallclock'] = datetime.timedelta(seconds=int(output['ru_wallclock']))
351
    stats_dict['mem'] = output['mem']
352
    stats_dict['cpu'] = datetime.timedelta(seconds=int(output['cpu'].split('.')[0]))
353
    stats_dict['queue'] = output['granted_pe']+':'+output['qname']
354
355
    return stats_dict
356
357
def running_stats(job_id):
358
    stats_dict = {}
359
    output={}
360
    with os.popen('qstat -j '+str(job_id)) as f:
361
        try:
362
            f.readline(); # read header
363
            for line in f:
364
                new_line = re.sub(' +',' ',line.strip())
365
                if new_line.split(' ')[0] == 'usage':
366
                    mstr = new_line.split(' ',2)[2]
367
                    output['cpu'] = mstr.split(',')[0].split('=')[1] # Note this needs to be in timedelta format
368
                    output['mem'] = mstr.split(',')[1].split('=')[1]
369
        except:
370
            pass
371
372
    stats_dict['wallclock'] = 0
373
    stats_dict['mem'] = output['mem']
374
    stats_dict['cpu'] = output['cpu']
375
376
    return stats_dict
377