Completed
Push — master ( ab2b1a...c33726 )
by Mathieu
7s
created

TestSmartdispatcher   A

Complexity

Total Complexity 13

Size/Duplication

Total Lines 147
Duplicated Lines 0 %
Metric Value
dl 0
loc 147
rs 10
wmc 13

7 Methods

Rating   Name   Duplication   Size   Complexity  
A test_main_launch_with_pool_of_workers() 0 12 1
A tearDown() 0 3 1
B test_main_resume() 0 48 5
A test_main_launch() 0 12 1
A setUp() 0 21 1
B test_main_resume_only_pending() 0 26 3
A test_launch_using_commands_file() 0 17 1
1
import os
2
import unittest
3
import tempfile
4
import shutil
5
from os.path import join as pjoin
6
7
from subprocess import call
8
9
from nose.tools import assert_true, assert_equal
10
11
12
class TestSmartdispatcher(unittest.TestCase):
13
14
    def setUp(self):
15
        self.testing_dir = tempfile.mkdtemp()
16
        self.logs_dir = os.path.join(self.testing_dir, 'SMART_DISPATCH_LOGS')
17
18
        self.folded_commands = 'echo "[1 2 3 4]" "[6 7 8]" "[9 0]"'
19
        self.commands = ["echo 1 6 9", "echo 1 6 0", "echo 1 7 9", "echo 1 7 0", "echo 1 8 9", "echo 1 8 0",
20
                         "echo 2 6 9", "echo 2 6 0", "echo 2 7 9", "echo 2 7 0", "echo 2 8 9", "echo 2 8 0",
21
                         "echo 3 6 9", "echo 3 6 0", "echo 3 7 9", "echo 3 7 0", "echo 3 8 9", "echo 3 8 0",
22
                         "echo 4 6 9", "echo 4 6 0", "echo 4 7 9", "echo 4 7 0", "echo 4 8 9", "echo 4 8 0"]
23
        self.nb_commands = len(self.commands)
24
25
        self.smart_dispatch_command = 'smart-dispatch -C 1 -q test -t 5:00 -x'
26
        self.launch_command = "{0} launch {1}".format(self.smart_dispatch_command, self.folded_commands)
27
        self.resume_command = "{0} resume {{0}}".format(self.smart_dispatch_command)
28
29
        smart_dispatch_command_with_pool = 'smart-dispatch --pool 10 -C 1 -q test -t 5:00 -x {0}'
30
        self.launch_command_with_pool = smart_dispatch_command_with_pool.format('launch ' + self.folded_commands)
31
        self.nb_workers = 10
32
33
        self._cwd = os.getcwd()
34
        os.chdir(self.testing_dir)
35
36
    def tearDown(self):
37
        os.chdir(self._cwd)
38
        shutil.rmtree(self.testing_dir)
39
40
    def test_main_launch(self):
41
        # Actual test
42
        exit_status = call(self.launch_command, shell=True)
43
44
        # Test validation
45
        assert_equal(exit_status, 0)
46
        assert_true(os.path.isdir(self.logs_dir))
47
        assert_equal(len(os.listdir(self.logs_dir)), 1)
48
49
        batch_uid = os.listdir(self.logs_dir)[0]
50
        path_job_commands = os.path.join(self.logs_dir, batch_uid, "commands")
51
        assert_equal(len(os.listdir(path_job_commands)), self.nb_commands + 1)
52
53
    def test_launch_using_commands_file(self):
54
        # Actual test
55
        commands_filename = "commands_to_run.txt"
56
        open(commands_filename, 'w').write("\n".join(self.commands))
57
58
        launch_command = self.smart_dispatch_command + " -f {0} launch".format(commands_filename)
59
        exit_status = call(launch_command, shell=True)
60
61
        # Test validation
62
        assert_equal(exit_status, 0)
63
        assert_true(os.path.isdir(self.logs_dir))
64
        assert_equal(len(os.listdir(self.logs_dir)), 1)
65
66
        batch_uid = os.listdir(self.logs_dir)[0]
67
        path_job_commands = os.path.join(self.logs_dir, batch_uid, "commands")
68
        assert_equal(len(os.listdir(path_job_commands)), self.nb_commands + 1)
69
        assert_equal(open(pjoin(path_job_commands, 'commands.txt')).read(), "\n".join(self.commands) + "\n")
70
71
    def test_main_launch_with_pool_of_workers(self):
72
        # Actual test
73
        exit_status = call(self.launch_command_with_pool, shell=True)
74
75
        # Test validation
76
        assert_equal(exit_status, 0)
77
        assert_true(os.path.isdir(self.logs_dir))
78
        assert_equal(len(os.listdir(self.logs_dir)), 1)
79
80
        batch_uid = os.listdir(self.logs_dir)[0]
81
        path_job_commands = os.path.join(self.logs_dir, batch_uid, "commands")
82
        assert_equal(len(os.listdir(path_job_commands)), self.nb_workers + 1)
83
84
    def test_main_resume(self):
85
        # Setup
86
        call(self.launch_command, shell=True)
87
        batch_uid = os.listdir(self.logs_dir)[0]
88
89
        # Simulate that some commands are in the running state.
90
        path_job_commands = os.path.join(self.logs_dir, batch_uid, "commands")
91
        pending_commands_file = pjoin(path_job_commands, "commands.txt")
92
        running_commands_file = pjoin(path_job_commands, "running_commands.txt")
93
        commands = open(pending_commands_file).read().strip().split("\n")
94
        with open(running_commands_file, 'w') as running_commands:
95
            running_commands.write("\n".join(commands[::2]) + "\n")
96
        with open(pending_commands_file, 'w') as pending_commands:
97
            pending_commands.write("\n".join(commands[1::2]) + "\n")
98
99
        # Actual test (should move running commands back to pending).
100
        exit_status = call(self.resume_command.format(batch_uid), shell=True)
101
102
        # Test validation
103
        assert_equal(exit_status, 0)
104
        assert_true(os.path.isdir(self.logs_dir))
105
        assert_equal(len(os.listdir(self.logs_dir)), 1)
106
        assert_equal(len(open(running_commands_file).readlines()), 0)
107
        assert_equal(len(open(pending_commands_file).readlines()), len(commands))
108
109
        # Test when batch_uid is a path instead of a jobname.
110
        # Setup
111
        batch_uid = os.path.join(self.logs_dir, os.listdir(self.logs_dir)[0])
112
113
        # Simulate that some commands are in the running state.
114
        path_job_commands = os.path.join(self.logs_dir, batch_uid, "commands")
115
        pending_commands_file = pjoin(path_job_commands, "commands.txt")
116
        running_commands_file = pjoin(path_job_commands, "running_commands.txt")
117
        commands = open(pending_commands_file).read().strip().split("\n")
118
        with open(running_commands_file, 'w') as running_commands:
119
            running_commands.write("\n".join(commands[::2]) + "\n")
120
        with open(pending_commands_file, 'w') as pending_commands:
121
            pending_commands.write("\n".join(commands[1::2]) + "\n")
122
123
        # Actual test (should move running commands back to pending).
124
        exit_status = call(self.resume_command.format(batch_uid), shell=True)
125
126
        # Test validation
127
        assert_equal(exit_status, 0)
128
        assert_true(os.path.isdir(self.logs_dir))
129
        assert_equal(len(os.listdir(self.logs_dir)), 1)
130
        assert_equal(len(open(running_commands_file).readlines()), 0)
131
        assert_equal(len(open(pending_commands_file).readlines()), len(commands))
132
133
    def test_main_resume_only_pending(self):
134
        # SetUp
135
        call(self.launch_command, shell=True)
136
        batch_uid = os.listdir(self.logs_dir)[0]
137
138
        # Simulate that some commands are in the running state.
139
        path_job_commands = os.path.join(self.logs_dir, batch_uid, "commands")
140
        pending_commands_file = pjoin(path_job_commands, "commands.txt")
141
        running_commands_file = pjoin(path_job_commands, "running_commands.txt")
142
        commands = open(pending_commands_file).read().strip().split("\n")
143
        with open(running_commands_file, 'w') as running_commands:
144
            running_commands.write("\n".join(commands[::2]) + "\n")
145
        with open(pending_commands_file, 'w') as pending_commands:
146
            pending_commands.write("\n".join(commands[1::2]) + "\n")
147
148
        # Actual test (should NOT move running commands back to pending).
149
        command_line = self.resume_command.format(batch_uid)
150
        command_line += " --onlyPending"
151
        exit_status = call(command_line, shell=True)
152
153
        # Test validation
154
        assert_equal(exit_status, 0)
155
        assert_true(os.path.isdir(self.logs_dir))
156
        assert_equal(len(os.listdir(self.logs_dir)), 1)
157
        assert_equal(len(open(running_commands_file).readlines()), len(commands[::2]))
158
        assert_equal(len(open(pending_commands_file).readlines()), len(commands[1::2]))
159