Completed
Push — master ( c3d926...4e38f3 )
by Mathieu
9s
created

test_get_commands_from_sparse_file()   B

Complexity

Conditions 1

Size

Total Lines 27

Duplication

Lines 0
Ratio 0 %

Importance

Changes 2
Bugs 0 Features 0
Metric Value
cc 1
c 2
b 0
f 0
dl 0
loc 27
rs 8.8571
1
import os
2
import re
3
import shutil
4
import time as t
5
from os.path import join as pjoin
6
from StringIO import StringIO
7
8
import tempfile
9
from nose.tools import assert_true, assert_equal
10
from numpy.testing import assert_array_equal
11
12
import smartdispatch
13
from smartdispatch import utils
14
15
16
def test_generate_name_from_command():
17
    date_length = 20
18
19
    command = "command arg1 arg2"
20
    expected = "_".join(command.split())
21
    assert_equal(smartdispatch.generate_name_from_command(command)[date_length:], expected)
22
23
    max_length_arg = 7
24
    long_arg = "veryverylongarg1"
25
    command = "command " + long_arg + " arg2"
26
    expected = command.split()
27
    expected[1] = long_arg[-max_length_arg:]
28
    expected = "_".join(expected)
29
    assert_equal(smartdispatch.generate_name_from_command(command, max_length_arg)[date_length:], expected)
30
31
    max_length = 23
32
    command = "command veryverylongarg1 veryverylongarg1 veryverylongarg1 veryverylongarg1"
33
    expected = command[:max_length].replace(" ", "_")
34
    assert_equal(smartdispatch.generate_name_from_command(command, max_length=max_length + date_length)[date_length:], expected)
35
36
    # Test path arguments in command
37
    command = "command path/number/one path/number/two"
38
    expected = "command_pathnumberone_pathnumbertwo"
39
    assert_equal(smartdispatch.generate_name_from_command(command)[date_length:], expected)
40
41
42
def test_get_commands_from_file():
43
    commands = ["command1 arg1 arg2",
44
                "command2",
45
                "command3 arg1 arg2 arg3 arg4"]
46
    fileobj = StringIO("\n".join(commands))
47
    assert_array_equal(smartdispatch.get_commands_from_file(fileobj), commands)
48
49
    # Test stripping last line if empty
50
    fileobj = StringIO("\n".join(commands) + "\n")
51
    assert_array_equal(smartdispatch.get_commands_from_file(fileobj), commands)
52
53
54
def test_unfold_command():
55
    # Test with one argument
56
    cmd = "ls"
57
    assert_equal(smartdispatch.unfold_command(cmd), ["ls"])
58
59
    cmd = "echo 1"
60
    assert_equal(smartdispatch.unfold_command(cmd), ["echo 1"])
61
62
    # Test two arguments
63
    cmd = "echo [1 2]"
64
    assert_equal(smartdispatch.unfold_command(cmd), ["echo 1", "echo 2"])
65
66
    cmd = "echo test [1 2] yay"
67
    assert_equal(smartdispatch.unfold_command(cmd), ["echo test 1 yay", "echo test 2 yay"])
68
69
    cmd = "echo test[1 2]"
70
    assert_equal(smartdispatch.unfold_command(cmd), ["echo test1", "echo test2"])
71
72
    cmd = "echo test[1 2]yay"
73
    assert_equal(smartdispatch.unfold_command(cmd), ["echo test1yay", "echo test2yay"])
74
75
    # Test multiple folded arguments
76
    cmd = "python my_command.py [0.01 0.000001 0.00000000001] -1 [omicron mu]"
77
    assert_equal(smartdispatch.unfold_command(cmd), ["python my_command.py 0.01 -1 omicron",
78
                                                     "python my_command.py 0.01 -1 mu",
79
                                                     "python my_command.py 0.000001 -1 omicron",
80
                                                     "python my_command.py 0.000001 -1 mu",
81
                                                     "python my_command.py 0.00000000001 -1 omicron",
82
                                                     "python my_command.py 0.00000000001 -1 mu"])
83
84
    # Test multiple folded arguments and not unfoldable brackets
85
    cmd = "python my_command.py [0.01 0.000001 0.00000000001] -1 \[[42 133,666]\] slow [omicron mu]"
86
    assert_equal(smartdispatch.unfold_command(cmd), ["python my_command.py 0.01 -1 [42] slow omicron",
87
                                                     "python my_command.py 0.01 -1 [42] slow mu",
88
                                                     "python my_command.py 0.01 -1 [133,666] slow omicron",
89
                                                     "python my_command.py 0.01 -1 [133,666] slow mu",
90
                                                     "python my_command.py 0.000001 -1 [42] slow omicron",
91
                                                     "python my_command.py 0.000001 -1 [42] slow mu",
92
                                                     "python my_command.py 0.000001 -1 [133,666] slow omicron",
93
                                                     "python my_command.py 0.000001 -1 [133,666] slow mu",
94
                                                     "python my_command.py 0.00000000001 -1 [42] slow omicron",
95
                                                     "python my_command.py 0.00000000001 -1 [42] slow mu",
96
                                                     "python my_command.py 0.00000000001 -1 [133,666] slow omicron",
97
                                                     "python my_command.py 0.00000000001 -1 [133,666] slow mu"])
98
99
    # Test multiple different folded arguments
100
    cmd = "python my_command.py [0.01 0.001] -[1:5] slow"
101
    assert_equal(smartdispatch.unfold_command(cmd), ["python my_command.py 0.01 -1 slow",
102
                                                     "python my_command.py 0.01 -2 slow",
103
                                                     "python my_command.py 0.01 -3 slow",
104
                                                     "python my_command.py 0.01 -4 slow",
105
                                                     "python my_command.py 0.001 -1 slow",
106
                                                     "python my_command.py 0.001 -2 slow",
107
                                                     "python my_command.py 0.001 -3 slow",
108
                                                     "python my_command.py 0.001 -4 slow"])
109
110
    cmd = "python my_command.py -[1:5] slow [0.01 0.001]"
111
    assert_equal(smartdispatch.unfold_command(cmd), ["python my_command.py -1 slow 0.01",
112
                                                     "python my_command.py -1 slow 0.001",
113
                                                     "python my_command.py -2 slow 0.01",
114
                                                     "python my_command.py -2 slow 0.001",
115
                                                     "python my_command.py -3 slow 0.01",
116
                                                     "python my_command.py -3 slow 0.001",
117
                                                     "python my_command.py -4 slow 0.01",
118
                                                     "python my_command.py -4 slow 0.001"])
119
120
121
def test_replace_uid_tag():
122
    command = "command without uid tag"
123
    assert_array_equal(smartdispatch.replace_uid_tag([command]), [command])
124
125
    command = "command with one {UID} tag"
126
    uid = utils.generate_uid_from_string(command)
127
    assert_array_equal(smartdispatch.replace_uid_tag([command]), [command.replace("{UID}", uid)])
128
129
    command = "command with two {UID} tag {UID}"
130
    uid = utils.generate_uid_from_string(command)
131
    assert_array_equal(smartdispatch.replace_uid_tag([command]), [command.replace("{UID}", uid)])
132
133
    commands = ["a command with a {UID} tag"] * 10
134
    uid = utils.generate_uid_from_string(commands[0])
135
    assert_array_equal(smartdispatch.replace_uid_tag(commands), [commands[0].replace("{UID}", uid)] * len(commands))
136
137
138
def test_get_available_queues():
139
    assert_equal(smartdispatch.get_available_queues(cluster_name=None), {})
140
    assert_equal(smartdispatch.get_available_queues(cluster_name="unknown"), {})
141
142
    queues_infos = smartdispatch.get_available_queues(cluster_name="guillimin")
143
    assert_true(len(queues_infos) > 0)
144
145
    queues_infos = smartdispatch.get_available_queues(cluster_name="mammouth")
146
    assert_true(len(queues_infos) > 0)
147
148
149
def test_get_job_folders():
150
    temp_dir = tempfile.mkdtemp()
151
    jobname = "this_is_the_name_of_my_job"
152
    job_folders_paths = smartdispatch.get_job_folders(temp_dir, jobname)
153
    path_job, path_job_logs, path_job_commands = job_folders_paths
154
155
    assert_true(jobname in path_job)
156
    assert_true(os.path.isdir(path_job))
157
    assert_equal(os.path.basename(path_job), jobname)
158
159
    assert_true(jobname in path_job_logs)
160
    assert_true(os.path.isdir(path_job_logs))
161
    assert_true(os.path.isdir(pjoin(path_job_logs, 'worker')))
162
    assert_true(os.path.isdir(pjoin(path_job_logs, 'job')))
163
    assert_true(os.path.isdir(path_job_logs))
164
    assert_equal(os.path.basename(path_job_logs), "logs")
165
166
    assert_true(jobname in path_job_commands)
167
    assert_true(os.path.isdir(path_job_commands))
168
    assert_equal(os.path.basename(path_job_commands), "commands")
169
170
    # In theory the following should not create new folders.
171
    # Insteead it will return the paths to existing folders.
172
    jobname += "2"
173
    os.rename(path_job, path_job + "2")
174
    job_folders_paths = smartdispatch.get_job_folders(temp_dir, jobname)
175
    path_job, path_job_logs, path_job_commands = job_folders_paths
176
177
    assert_true(jobname in path_job)
178
    assert_true(os.path.isdir(path_job))
179
    assert_equal(os.path.basename(path_job), jobname)
180
181
    assert_true(jobname in path_job_logs)
182
    assert_true(os.path.isdir(path_job_logs))
183
    assert_true(os.path.isdir(pjoin(path_job_logs, 'worker')))
184
    assert_true(os.path.isdir(pjoin(path_job_logs, 'job')))
185
    assert_true(os.path.isdir(path_job_logs))
186
    assert_equal(os.path.basename(path_job_logs), "logs")
187
188
    assert_true(jobname in path_job_commands)
189
    assert_true(os.path.isdir(path_job_commands))
190
    assert_equal(os.path.basename(path_job_commands), "commands")
191
192
    shutil.rmtree(temp_dir)
193
194
195
def test_log_command_line():
196
    temp_dir = tempfile.mkdtemp()
197
    command_line_log_file = pjoin(temp_dir, "command_line.log")
198
199
    command_1 = "echo 1 2 3"
200
    smartdispatch.log_command_line(temp_dir, command_1)
201
    assert_true(os.path.isfile(command_line_log_file))
202
203
    lines = open(command_line_log_file).read().strip().split("\n")
204
    assert_equal(len(lines), 2)  # Datetime, the command line.
205
    assert_true(t.strftime("## %Y-%m-%d %H:%M:") in lines[0])  # Don't check second.
206
    assert_equal(lines[1], command_1)
207
208
    command_2 = "echo \"bob\""  # With quotes.
209
    smartdispatch.log_command_line(temp_dir, command_2)
210
    assert_true(os.path.isfile(command_line_log_file))
211
212
    lines = open(command_line_log_file).read().strip().split("\n")
213
    assert_equal(len(lines), 5)
214
    assert_true(t.strftime("## %Y-%m-%d %H:%M:") in lines[3])  # Don't check second.
215
    assert_equal(lines[4], command_2.replace('"', r'\"'))
216
217
    command_3 = "echo [asd]"  # With square brackets.
218
    smartdispatch.log_command_line(temp_dir, command_3)
219
    assert_true(os.path.isfile(command_line_log_file))
220
221
    lines = open(command_line_log_file).read().strip().split("\n")
222
    assert_equal(len(lines), 8)
223
    assert_true(t.strftime("## %Y-%m-%d %H:%M:") in lines[6])  # Don't check second.
224
    assert_equal(lines[7], re.sub(r'(\[)([^\[\]]*\\ [^\[\]]*)(\])', r'"\1\2\3"', command_3))
225
226
    shutil.rmtree(temp_dir)
227
228
229
def test_get_commands_from_sparse_file():
230
    commands_1 = ["",
231
                  "command1 arg1 arg2",
232
                  "command2",
233
                  "",
234
                  "",
235
                  "command3 arg1 arg2 arg3 arg4",
236
                  ""]
237
238
    commands_2 = [" ",
239
                  "command1 arg1 arg2",
240
                  "command2",
241
                  "\t",
242
                  "",
243
                  "command3 arg1 arg2 arg3 arg4",
244
                  ""]
245
246
    expected_commands = ["command1 arg1 arg2",
247
                         "command2",
248
                         "command3 arg1 arg2 arg3 arg4"]
249
250
    fileobj = StringIO("\n".join(commands_1))
251
    assert_array_equal(smartdispatch.get_commands_from_file(fileobj), expected_commands)
252
253
    # Test if there is some white in the empty line
254
    fileobj = StringIO("\n".join(commands_2))
255
    assert_array_equal(smartdispatch.get_commands_from_file(fileobj), expected_commands)
256