Completed
Pull Request — master (#173)
by
unknown
27s
created

build_argv()   C

Complexity

Conditions 7

Size

Total Lines 51

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 7
c 1
b 0
f 0
dl 0
loc 51
rs 5.7838

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
import datetime
2
import inspect
3
import functools
4
import getpass
5
import glob
6
import os
7
import pdb
8
import subprocess
9
import sys
10
import time
11
import traceback
12
13
WALLTIME = 60  # seconds
14
15
command_string = """\
16
#!/usr/bin/env /bin/bash
17
18
######################
19
# Begin work section #
20
######################
21
22
echo "My SLURM_JOB_ID:" $SLURM_JOB_ID
23
echo "My SLURM_ARRAY_JOB_ID:" $SLURM_ARRAY_JOB_ID
24
echo "My SLURM_ARRAY_TASK_ID: " $SLURM_ARRAY_TASK_ID
25
26
echo called with option "$1"
27
28
export HOME=`getent passwd $USER | cut -d':' -f6`
29
source ~/.bashrc
30
export THEANO_FLAGS=...
31
export PYTHONUNBUFFERED=1
32
echo Running on $HOSTNAME
33
34
if [ -e "paused$1.log" ]
35
then
36
    echo "resuming $1"
37
    touch resumed$1.log
38
else
39
    echo "running $1 from scratch"
40
    touch running$1.log
41
fi
42
43
44
# Test GPUs
45
echo "echo CUDA_VISIBLE_DEVICES"
46
echo $CUDA_VISIBLE_DEVICES
47
echo
48
49
nvidia-smi
50
51
# Test CPUs
52
# How?
53
54
# Test resume
55
if [ ! -e "paused$1.log" ]
56
then
57
    touch paused$1.log
58
    echo "sleeping $1 %(sleep)s seconds"
59
    sleep %(sleep)ss
60
fi
61
62
echo completed $1
63
mv paused$1.log completed$1.log
64
"""
65
66
67
def set_defaults(dictionary, **kwargs):
68
69
    for item, value in kwargs.iteritems():
70
        dictionary.setdefault(item, value)
71
72
73
def strfdelta(tdelta, fmt):
74
    """
75
    From https://stackoverflow.com/a/8907269
76
    """
77
78
    d = {}
79
    d["hours"], rem = divmod(tdelta.seconds, 3600)
80
    d["hours"] += tdelta.days * 24
81
    d["minutes"], d["seconds"] = divmod(rem, 60)
82
    return fmt % d
83
84
85
def infer_verification_name():
86
87
    for stack in inspect.stack():
88
        if stack[3].startswith("verify_"):
89
            return stack[3]
90
91
    raise RuntimeError("Cannot infer verification name:\n %s" %
92
                       "\n".join(str(t) for t in traceback.format_stack())) 
93
94
95
def build_argv(coresPerCommand, gpusPerCommand, walltime, coresPerNode,
96
               gpusPerNode, batchName=None, commandsFile=None,
97
               doNotLaunch=False, autoresume=False, pool=None,
98
               sbatchFlags=None):
99
100
    if batchName is None:
101
        batchName = infer_verification_name()
102
103
    argv = """
104
-vv
105
--queueName dummy
106
--batchName %(batchName)s --walltime %(walltime)s
107
--coresPerCommand %(coresPerCommand)s
108
--gpusPerCommand %(gpusPerCommand)s
109
--coresPerNode %(coresPerNode)s
110
--gpusPerNode %(gpusPerNode)s
111
    """ % dict(batchName=batchName,
112
               walltime=strfdelta(
113
                   datetime.timedelta(seconds=walltime),
114
                   "%(hours)02d:%(minutes)02d:%(seconds)02d"),
115
               coresPerCommand=coresPerCommand,
116
               gpusPerCommand=gpusPerCommand,
117
               coresPerNode=coresPerNode,
118
               gpusPerNode=gpusPerNode)
119
120
    # File containing commands to launch. Each command must
121
    # be on a seperate line. (Replaces commandAndOptions)
122
    if commandsFile:
123
        argv += " --commandsFile " + commandsFile
124
125
    # Generate all the files without launching the job.
126
    if doNotLaunch:
127
        argv += " --doNotLaunch"
128
129
    # Requeue the job when the running time hits the maximum
130
    # walltime allowed on the cluster. Assumes that commands
131
    # are resumable.
132
    if autoresume:
133
        argv += " --autoresume"     
134
135
    # Number of workers that will be consuming commands.
136
    # Default: Nb commands
137
    if pool:
138
        argv += " --pool " + pool
139
140
    # ADVANCED USAGE: Allow to pass a space seperated list of SBATCH flags.
141
    # Ex:--sbatchFlags="--qos=high --ofile.out"
142
    if sbatchFlags:
143
        argv += " --sbatchFlags=" + sbatchFlags
144
145
    return argv.replace("\n", " ")
146
147
148
def get_squeue():
149
    command = ("squeue -u %(username)s" %
150
               dict(username=getpass.getuser()))
151
    process = subprocess.Popen(
152
        command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
153
    stdout, stderr = process.communicate()
154
    return stdout
155
156
157
def try_to_remove_file(filename_template, expected_number):
158
    file_names = glob.glob(filename_template)
159
    try:
160
        i = 0
161
        for file_name in file_names:
162
            i += 1
163
            os.remove(file_name)
164
    except OSError as e:
165
        print str(e)
166
167
    if i != expected_number:
168
        print "Error: Expected %d files, found %d" % (expected_number, i)
169
    else:
170
        print "OK: All %d files %s were found:\n%s" % (
171
            expected_number, filename_template,
172
            "\n".join(sorted(file_names)))
173
174
175
def minimum_requirement(attribute_name, minimum_value):
176
177
    def decorator(method):
178
179
        @functools.wraps(method)
180
        def call(self, *args, **kwargs):
181
182
            # Method was called from another verification
183
            try:
184
                verification_name = infer_verification_name()
185
            # Method was called directly 
186
            except RuntimeError:
187
                verification_name = method.__name__
188
189
            if not hasattr(self, attribute_name):
190
                raise ValueError("Invalid requirement, object %s does not "
191
                                 "have attribute %s" %
192
                                 (self.__class__.__name__, attribute_name))
193
194
            if getattr(self, attribute_name) >= minimum_value:
195
                return method(self, *args, **kwargs)
196
            else:
197
                print ("%s does not have enough %s: %d."
198
                       "Skipping %s." %
199
                       (self.__class__.__name__, attribute_name, minimum_value,
200
                        verification_name))
201
                return None
202
203
        return call
204
205
    return decorator
206
207
208
class VerifySlurmCluster(object):
209
210
    WALLTIME = 60
211
    CORES_PER_NODE = 8
212
    GPUS_PER_NODE = 0
213
214
    def __init__(self, debug=False, no_fork=False):
215
        self.debug = debug
216
        self.no_fork = no_fork
217
218
    def get_verification_methods(self, filtered_by=None):
219
         methods = inspect.getmembers(self, predicate=inspect.ismethod)
220
221
         def filtering(item):
222
             key = item[0]
223
224
             if not key.startswith("verify_"):
225
                 return False
226
             elif filtered_by is not None and key not in filtered_by:
227
                 return False
228
229
             return True
230
231
         return dict(filter(filtering, methods))
232
233
    def run_verifications(self, filtered_by=None):
234
        if filtered_by is not None and len(filtered_by) == 0:
235
            filtered_by = None
236
237
        verification_methods = self.get_verification_methods(filtered_by)
238
        processes = []
239
        for verification_name, verification_fct in \
240
                    verification_methods.iteritems():
241
            print "========%s" % ("=" * len(verification_name))
242
            print "Running %s" % verification_name
243
            print "========%s\n\n" % ("=" * len(verification_name))
244
245
            if self.debug or self.no_fork:
246
                verification_fct()
247
            else:
248
                # fork the process in a new dir and new stdout, stderr
249
                verification_dir = os.path.join(
250
                    os.getcwd(), self.__class__.__name__, verification_name)
251
252
                if not os.path.isdir(verification_dir):
253
                    os.makedirs(verification_dir)
254
255
                stdout = open(os.path.join(verification_dir,
256
                                           "validation.out"), 'w')
257
                stderr = open(os.path.join(verification_dir,
258
                                           "validation.err"), 'w')
259
260
                popen = subprocess.Popen(
261
                    "/bin/bash",
262
                    shell=True, 
263
                    stdin=subprocess.PIPE,
264
                    stdout=stdout,
265
                    stderr=stderr)
266
267
                popen.stdin.write("cd %s;" % verification_dir)
268
269
                script_path = os.path.join(
270
                    os.getcwd(), inspect.getfile(self.__class__))
271
                popen.stdin.write(
272
                    "python %s --no-fork %s;" % (
273
                        script_path, verification_name))
274
                print "python %s --no-fork %s;" % (
275
                    script_path, verification_name)
276
277
                processes.append(popen)
278
279
        for popen in processes:
280
            # popen.communicate()
281
            popen.terminate()
282
283
    def run_test(self, argv, command_string, command_arguments=""):
284
        FILE_NAME = "test.sh"
285
286
        with open("test.sh", "w") as text_file:
287
            text_file.write(command_string)
288
289
        command = ("smart-dispatch %s launch bash %s %s" %
290
                   (argv, FILE_NAME, command_arguments))
291
        print "running test with command: "
292
        print command
293
294
        process = subprocess.Popen(
295
            command,
296
            stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
297
        stdout, stderr = process.communicate()
298
299
        print "\nstdout:"
300
        print stdout.decode()
301
302
        print "\nstderr:"
303
        print stderr.decode()
304
        return stdout.split("\n")[-2].strip()
305
306
    def validate(self, root_dir, argv, squeue_wait, nb_of_commands=1,
307
                 resume=False):
308
309
        print "\nValidating arguments:"
310
        print argv
311
312
        stdout = get_squeue()
313
        number_of_process = stdout.count("\n") - 1
314
315
        while number_of_process > 0:
316
            root = os.path.join(root_dir, "commands")
317
            for file_path in os.listdir(root):
318
                if file_path.endswith(".sh"):
319
                    print file_path
320
                    print open(os.path.join(root, file_path), 'r').read()
321
322
            print stdout
323
            print "Waiting %d seconds" % squeue_wait
324
            time.sleep(squeue_wait)
325
            stdout = get_squeue()
326
            number_of_process = stdout.count("\n") - 1
327
            print stdout
328
            print number_of_process
329
330
        try_to_remove_file("running*.log", expected_number=nb_of_commands)
331
        try_to_remove_file("resumed*.log",
332
                           expected_number=nb_of_commands * int(resume))
333
        try_to_remove_file("completed*.log", expected_number=nb_of_commands)
334
335
        root = os.path.join(root_dir, "logs")
336
        for file_path in reversed(sorted(os.listdir(root))):
337
            if file_path.endswith(".err") or file_path.endswith(".out"):
338
                print file_path
339
                print open(os.path.join(root, file_path), 'r').read()
340
                if self.debug:
341
                    pdb.set_trace()
342
343
    def get_arguments(self, **kwargs):
344
345
        set_defaults(
346
            kwargs,
347
            coresPerCommand=1,
348
            gpusPerCommand=0,
349
            walltime=self.WALLTIME,
350
            coresPerNode=self.CORES_PER_NODE,
351
            gpusPerNode=self.GPUS_PER_NODE)
352
353
        return kwargs
354
355
    def base_verification(self, sleep_time=0, command_arguments="",
356
                          resume=False, squeue_wait=None, nb_of_commands=1,
357
                          **kwargs):
358
359
        if squeue_wait is None and self.debug:
360
            squeue_wait = sleep_time + 5
361
        elif squeue_wait is None:
362
            squeue_wait = self.WALLTIME * 2
363
        
364
        arguments = self.get_arguments(**kwargs)
365
        argv = build_argv(**arguments)
366
367
        root_dir = self.run_test(argv, command_string % dict(sleep=sleep_time),
368
                                 command_arguments=command_arguments)
369
        self.validate(root_dir, argv, squeue_wait, nb_of_commands,
370
                      resume=resume)
371
372
    def verify_simple_task(self, **kwargs):
373
        self.base_verification(**kwargs)
374
375
    def verify_simple_task_with_one_gpu(self, **kwargs):
376
        set_defaults(
377
            kwargs,
378
            gpusPerCommand=1,
379
            gpusPerNode=1)
380
381
        self.verify_simple_task(**kwargs)
382
383
    @minimum_requirement("GPUS_PER_NODE", 2)
384
    def verify_simple_task_with_many_gpus(self, **kwargs):
385
386
        for gpus_per_command in xrange(2, self.GPUS_PER_NODE + 1):
387
            arguments = kwargs.copy()
388
            arguments["gpusPerCommand"] = gpus_per_command
389
390
            self.verify_simple_task(**arguments)
391
392
    @minimum_requirement("CORES_PER_NODE", 2)
393
    def verify_many_task(self, **kwargs):
394
        set_defaults(
395
            kwargs,
396
            nb_of_commands=self.CORES_PER_NODE)
397
398
        command_arguments = (
399
            "[%s]" % " ".join(str(i) for i in range(kwargs["nb_of_commands"])))
400
401
        set_defaults(
402
            kwargs,
403
            command_arguments=command_arguments)
404
405
        self.verify_simple_task(**kwargs)
406
407
    @minimum_requirement("CORES_PER_NODE", 4)
408
    def verify_many_task_with_many_cores(self, **kwargs):
409
        for cores_per_command in xrange(2, self.CORES_PER_NODE):
410
            if cores_per_command // self.CORES_PER_NODE <= 1:
411
                break
412
413
            arguments = kwargs.copy()
414
            arguments["cores_per_command"] = cores_per_command
415
            arguments["nb_of_commands"] = (
416
                cores_per_command //
417
                self.CORES_PER_NODE)
418
419
            self.many_task(**arguments)
420
421
    @minimum_requirement("GPUS_PER_NODE", 2)
422
    def verify_many_task_with_one_gpu(self, **kwargs):
423
        set_defaults(
424
            kwargs,
425
            nb_of_commands=self.GPUS_PER_NODE,
426
            gpusPerCommand=1)
427
428
        self.verify_many_task(**kwargs)
429
430
    @minimum_requirement("GPUS_PER_NODE", 4)
431
    def verify_many_task_with_many_gpus(self, **kwargs):
432
        for gpus_per_command in xrange(2, self.GPUS_PER_NODE + 1):
433
            if gpus_per_command // self.GPUS_PER_NODE <= 1:
434
                break
435
436
            arguments = kwargs.copy()
437
            arguments["gpusPerCommand"] = gpus_per_command
438
            arguments["nb_of_commands"] = (
439
                gpus_per_command //
440
                self.GPUS_PER_NODE)
441
442
            self.verify_many_task_with_one_gpu(**arguments)
443
444
    def verify_simple_task_with_autoresume_unneeded(self, **kwargs):
445
        walltime = 2 * 60
446
        set_defaults(
447
            kwargs,
448
            walltime=walltime,
449
            resume=False,
450
            autoresume=True)
451
452
        self.verify_simple_task(**kwargs)
453
454
    def verify_simple_task_with_autoresume_needed(self, **kwargs):
455
        walltime = 2 * 60
456
        set_defaults(
457
            kwargs,
458
            sleep_time=walltime,
459
            walltime=walltime,
460
            resume=True,
461
            autoresume=True)
462
463
        self.verify_simple_task(**kwargs)
464
465
    def verify_many_task_with_autoresume_needed(self, **kwargs):
466
        walltime = 2 * 60
467
        set_defaults(
468
            kwargs,
469
            sleep_time=walltime,
470
            walltime=walltime,
471
            resume=True,
472
            autoresume=True)
473
474
        self.verify_many_task(**kwargs)
475
476
    # def verify_pool(self, **kwargs):
477
    #     pass
478