Completed
Pull Request — master (#166)
by
unknown
28s
created

TestSlurm.test_gres()   A

Complexity

Conditions 1

Size

Total Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 3
Bugs 0 Features 0
Metric Value
cc 1
c 3
b 0
f 0
dl 0
loc 7
rs 9.4285
1
from glob import glob
2
import os
3
import time
4
import unittest
5
6
from subprocess import Popen, PIPE
7
8
pbs_string = """\
9
#!/usr/bin/env /bin/bash
10
11
#PBS -N arrayJob
12
#PBS -o arrayJob_%A_%a.out
13
#PBS -l walltime=01:00:00
14
{}
15
16
######################
17
# Begin work section #
18
######################
19
20
echo "My SLURM_ARRAY_JOB_ID:" $SLURM_ARRAY_JOB_ID
21
echo "My SLURM_ARRAY_TASK_ID: " $SLURM_ARRAY_TASK_ID
22
nvidia-smi
23
"""
24
25
sbatch_string = """\
26
#!/usr/bin/env -i /bin/zsh
27
28
#SBATCH --job-name=arrayJob
29
#SBATCH --output=arrayJob_%A_%a.out
30
#SBATCH --time=01:00:00
31
{}
32
33
######################
34
# Begin work section #
35
######################
36
37
echo "My SLURM_ARRAY_JOB_ID:" $SLURM_ARRAY_JOB_ID
38
echo "My SLURM_ARRAY_TASK_ID: " $SLURM_ARRAY_TASK_ID
39
nvidia-smi
40
"""
41
42
# Checking which cluster is running the tests first
43
process = Popen("sacctmgr list cluster", stdout=PIPE, stderr=PIPE, shell=True)
44
stdout, _ = process.communicate()
45
stdout = stdout.decode()
46
cluster = stdout.splitlines()[2].strip().split(' ')[0]
47
to_skip = cluster in ['graham', 'cedar']
48
message = "Test does not run on cluster {}".format(cluster)
49
50
class TestSlurm(unittest.TestCase):
51
52
    def tearDown(self):
53
        for file_name in (glob('*.out') + ["test.pbs"]):
54
            os.remove(file_name)
55
56
    def _test_param(self, param_array, command, flag, string=pbs_string, output_array=None):
57
        output_array = output_array or param_array
58
        for param, output in zip(param_array, output_array):
59
            com = pbs_string.format(
60
                string.format(command.format(param))
61
            )
62
            with open("test.pbs", "w") as text_file:
63
                text_file.write(com)
64
            process = Popen("sbatch test.pbs", stdout=PIPE, stderr=PIPE, shell=True)
65
            stdout, _ = process.communicate()
66
            stdout = stdout.decode()
67
            self.assertIn("Submitted batch job", stdout)
68
            job_id = stdout.split(" ")[-1].strip()
69
70
            time.sleep(0.25)
71
            process = Popen("squeue -u $USER -j {} -O {}".format(job_id, flag), stdout=PIPE, stderr=PIPE, shell=True)
72
            stdout, _ = process.communicate()
73
            job_params = [c.strip() for c in stdout.decode().split("\n")[1:] if c != '']
74
            self.assertSequenceEqual(job_params, [output for _ in range(len(job_params))])
75
76
    @unittest.skipIf(to_skip, message)
77
    def test_priority(self):
78
        self._test_param(
79
            ['high', 'low'],
80
            "#SBATCH --qos={}",
81
            "qos",
82
            pbs_string
83
        )
84
85
    def test_gres(self):
86
        self._test_param(
87
            ["1", "2"],
88
            "#PBS -l naccelerators={}",
89
            "gres",
90
            pbs_string,
91
            ["gpu:1", "gpu:2"]
92
        )
93
94
    def test_memory(self):
95
        self._test_param(
96
            ["2G", "4G"],
97
            "#PBS -l mem={}",
98
            "minmemory",
99
            pbs_string
100
        )
101
102
    def test_nb_cpus(self):
103
        self._test_param(
104
            ["2", "3"],
105
            "#PBS -l ncpus={}",
106
            "mincpus",
107
            pbs_string
108
        )
109
110
    @unittest.skipIf(to_skip, message)
111
    def test_constraint(self):
112
        self._test_param(
113
            ["gpu6gb", "gpu8gb"],
114
            "#PBS -l proc={}",
115
            "feature",
116
            pbs_string
117
        )
118
119
if __name__ == '__main__':
120
    unittest.main()
121