Completed
Push — master ( 0b9edb...ab2b1a )
by Mathieu
10s
created

TestSmartdispatcher.test_main_resume()   B

Complexity

Conditions 5

Size

Total Lines 48

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 5
dl 0
loc 48
rs 8.2082
1
import os
2
import unittest
3
import tempfile
4
import shutil
5
from os.path import join as pjoin
6
7
from subprocess import call
8
9
from nose.tools import assert_true, assert_equal
10
11
12
class TestSmartdispatcher(unittest.TestCase):
13
14
    def setUp(self):
15
        self.testing_dir = tempfile.mkdtemp()
16
        self.logs_dir = os.path.join(self.testing_dir, 'SMART_DISPATCH_LOGS')
17
18
        self.commands = 'echo "[1 2 3 4]" "[6 7 8]" "[9 0]"'
19
        self.nb_commands = 4*3*2
20
21
        smart_dispatch_command = 'smart-dispatch -C 1 -q test -t 5:00 -x {0}'
22
        self.launch_command = smart_dispatch_command.format('launch ' + self.commands)
23
        self.resume_command = smart_dispatch_command.format('resume {0}')
24
25
        smart_dispatch_command_with_pool = 'smart-dispatch --pool 10 -C 1 -q test -t 5:00 -x {0}'
26
        self.launch_command_with_pool = smart_dispatch_command_with_pool.format('launch ' + self.commands)
27
        self.nb_workers = 10
28
29
        self._cwd = os.getcwd()
30
        os.chdir(self.testing_dir)
31
32
    def tearDown(self):
33
        os.chdir(self._cwd)
34
        shutil.rmtree(self.testing_dir)
35
36
    def test_main_launch(self):
37
        # Actual test
38
        exit_status = call(self.launch_command, shell=True)
39
40
        # Test validation
41
        assert_equal(exit_status, 0)
42
        assert_true(os.path.isdir(self.logs_dir))
43
        assert_equal(len(os.listdir(self.logs_dir)), 1)
44
45
        batch_uid = os.listdir(self.logs_dir)[0]
46
        path_job_commands = os.path.join(self.logs_dir, batch_uid, "commands")
47
        assert_equal(len(os.listdir(path_job_commands)), self.nb_commands + 1)
48
49
    def test_main_launch_with_pool_of_workers(self):
50
        # Actual test
51
        exit_status = call(self.launch_command_with_pool, shell=True)
52
53
        # Test validation
54
        assert_equal(exit_status, 0)
55
        assert_true(os.path.isdir(self.logs_dir))
56
        assert_equal(len(os.listdir(self.logs_dir)), 1)
57
58
        batch_uid = os.listdir(self.logs_dir)[0]
59
        path_job_commands = os.path.join(self.logs_dir, batch_uid, "commands")
60
        assert_equal(len(os.listdir(path_job_commands)), self.nb_workers + 1)
61
62
    def test_main_resume(self):
63
        # Setup
64
        call(self.launch_command, shell=True)
65
        batch_uid = os.listdir(self.logs_dir)[0]
66
67
        # Simulate that some commands are in the running state.
68
        path_job_commands = os.path.join(self.logs_dir, batch_uid, "commands")
69
        pending_commands_file = pjoin(path_job_commands, "commands.txt")
70
        running_commands_file = pjoin(path_job_commands, "running_commands.txt")
71
        commands = open(pending_commands_file).read().strip().split("\n")
72
        with open(running_commands_file, 'w') as running_commands:
73
            running_commands.write("\n".join(commands[::2]) + "\n")
74
        with open(pending_commands_file, 'w') as pending_commands:
75
            pending_commands.write("\n".join(commands[1::2]) + "\n")
76
77
        # Actual test (should move running commands back to pending).
78
        exit_status = call(self.resume_command.format(batch_uid), shell=True)
79
80
        # Test validation
81
        assert_equal(exit_status, 0)
82
        assert_true(os.path.isdir(self.logs_dir))
83
        assert_equal(len(os.listdir(self.logs_dir)), 1)
84
        assert_equal(len(open(running_commands_file).readlines()), 0)
85
        assert_equal(len(open(pending_commands_file).readlines()), len(commands))
86
87
        # Test when batch_uid is a path instead of a jobname.
88
        # Setup
89
        batch_uid = os.path.join(self.logs_dir, os.listdir(self.logs_dir)[0])
90
91
        # Simulate that some commands are in the running state.
92
        path_job_commands = os.path.join(self.logs_dir, batch_uid, "commands")
93
        pending_commands_file = pjoin(path_job_commands, "commands.txt")
94
        running_commands_file = pjoin(path_job_commands, "running_commands.txt")
95
        commands = open(pending_commands_file).read().strip().split("\n")
96
        with open(running_commands_file, 'w') as running_commands:
97
            running_commands.write("\n".join(commands[::2]) + "\n")
98
        with open(pending_commands_file, 'w') as pending_commands:
99
            pending_commands.write("\n".join(commands[1::2]) + "\n")
100
101
        # Actual test (should move running commands back to pending).
102
        exit_status = call(self.resume_command.format(batch_uid), shell=True)
103
104
        # Test validation
105
        assert_equal(exit_status, 0)
106
        assert_true(os.path.isdir(self.logs_dir))
107
        assert_equal(len(os.listdir(self.logs_dir)), 1)
108
        assert_equal(len(open(running_commands_file).readlines()), 0)
109
        assert_equal(len(open(pending_commands_file).readlines()), len(commands))
110
111
    def test_main_resume_only_pending(self):
112
        # SetUp
113
        call(self.launch_command, shell=True)
114
        batch_uid = os.listdir(self.logs_dir)[0]
115
116
        # Simulate that some commands are in the running state.
117
        path_job_commands = os.path.join(self.logs_dir, batch_uid, "commands")
118
        pending_commands_file = pjoin(path_job_commands, "commands.txt")
119
        running_commands_file = pjoin(path_job_commands, "running_commands.txt")
120
        commands = open(pending_commands_file).read().strip().split("\n")
121
        with open(running_commands_file, 'w') as running_commands:
122
            running_commands.write("\n".join(commands[::2]) + "\n")
123
        with open(pending_commands_file, 'w') as pending_commands:
124
            pending_commands.write("\n".join(commands[1::2]) + "\n")
125
126
        # Actual test (should NOT move running commands back to pending).
127
        command_line = self.resume_command.format(batch_uid)
128
        command_line += " --onlyPending"
129
        exit_status = call(command_line, shell=True)
130
131
        # Test validation
132
        assert_equal(exit_status, 0)
133
        assert_true(os.path.isdir(self.logs_dir))
134
        assert_equal(len(os.listdir(self.logs_dir)), 1)
135
        assert_equal(len(open(running_commands_file).readlines()), len(commands[::2]))
136
        assert_equal(len(open(pending_commands_file).readlines()), len(commands[1::2]))
137