ActionTreeTestCase.assertEventsEqual()   A
last analyzed

Complexity

Conditions 4

Size

Total Lines 7

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
c 1
b 0
f 0
dl 0
loc 7
rs 9.2
cc 4
1
# coding: utf8
2
3
# Copyright 2013-2018 Vincent Jacques <[email protected]>
4
5
from __future__ import division, absolute_import, print_function
6
7
import ctypes
8
import multiprocessing
9
import subprocess
10
import sys
11
import tempfile
12
import time
13
import unittest
14
15
try:
16
    import unittest.mock
17
except ImportError:  # Not unittested: test code for Python 2
18
    import mock
19
    unittest.mock = mock
20
21
from ActionTree import *
22
23
libc = ctypes.CDLL(None)
24
25
26
# No multiprocessing.Barrier in Python 2, so we build our own
27
class Barrier:
28
    def __init__(self, expected_processes):
29
        self.__expected_processes = expected_processes
30
        self.__waiting_processes = multiprocessing.Value("i", 0)
31
        self.__barrier = multiprocessing.Semaphore(0)
32
33
    def wait(self):
34
        with self.__waiting_processes.get_lock():
35
            self.__waiting_processes.value += 1
36
        if self.__waiting_processes.value == self.__expected_processes:
37
            self.__barrier.release()
38
        self.__barrier.acquire()
39
        self.__barrier.release()
40
41
42
# multiprocessing.Semaphores are not picklable, so we pickle their ids and retrieve them from this dict.
43
# We don't deallocate them so we do leak some resources, but we accept that in unit tests.
44
barriers = {}
45
46
47
class TestAction(Action):
48
    def __init__(
49
        self, label,
50
        exception, return_value, barrier_id,
51
        events_file, end_event,
52
        print_on_stdout, print_on_stderr, puts_on_stdout, echo_on_stdout,
53
        accept_failed_dependencies,
54
    ):
55
        super(TestAction, self).__init__(label=label, accept_failed_dependencies=accept_failed_dependencies)
56
        self.__exception = exception
57
        self.__return_value = return_value
58
        self.__barrier_id = barrier_id
59
        self.__events_file = events_file
60
        self.__end_event = end_event
61
        self.__print_on_stdout = print_on_stdout
62
        self.__print_on_stderr = print_on_stderr
63
        self.__puts_on_stdout = puts_on_stdout
64
        self.__echo_on_stdout = echo_on_stdout
65
66
    def __repr__(self):
67
        return "<TestAction {}>".format(self.label)
68
69
    def do_execute(self, dependency_statuses):
70
        for d in self.dependencies:
71
            assert self.accept_failed_dependencies or dependency_statuses[d].status == SUCCESSFUL
72
        with open(self.__events_file, "a") as f:
73
            f.write("{}\n".format(str(self.label).lower()))
74
        if self.__barrier_id:
75
            barriers[self.__barrier_id].wait()
76
        if self.__end_event:
77
            with open(self.__events_file, "a") as f:
78
                f.write("{}\n".format(str(self.label).upper()))
79
        if isinstance(self.__print_on_stdout, str):
80
            print(self.__print_on_stdout)
81
        elif isinstance(self.__print_on_stdout, list):
82
            for (p, d) in self.__print_on_stdout:
83
                print(p)
84
                sys.stdout.flush()
85
                time.sleep(d)
86
        if self.__print_on_stderr:
87
            print(self.__print_on_stderr, file=sys.stderr)
88
        if self.__puts_on_stdout:
89
            libc.puts(self.__puts_on_stdout)
90
        if self.__echo_on_stdout:
91
            subprocess.check_call(["echo", self.__echo_on_stdout])
92
        if self.__exception:
93
            raise self.__exception
94
        else:
95
            return self.__return_value
96
97
98
class ActionTreeTestCase(unittest.TestCase):
99
    def setUp(self):
100
        # print(self.id())
101
        (fd, events_file) = tempfile.mkstemp()
102
        os.close(fd)
103
        self.__events_file = events_file
104
105
    def tearDown(self):
106
        os.unlink(self.__events_file)
107
108
    def _action(
109
        self, label,
110
        exception=None, return_value=None, barrier=None,
111
        end_event=False,
112
        print_on_stdout=None, print_on_stderr=None, puts_on_stdout=None, echo_on_stdout=None,
113
        accept_failed_dependencies=False,
114
        *args, **kwds
115
    ):
116
        return TestAction(
117
            label,
118
            exception, return_value, barrier,
119
            self.__events_file, end_event,
120
            print_on_stdout, print_on_stderr, puts_on_stdout, echo_on_stdout,
121
            accept_failed_dependencies,
122
            *args, **kwds
123
        )
124
125
    def _barrier(self, n):
126
        barrier = Barrier(n)
127
        barrier_id = id(barrier)
128
        barriers[barrier_id] = barrier
129
        return barrier_id
130
131
    def assertEventsEqual(self, groups):
132
        with open(self.__events_file) as f:
133
            events = [line.strip() for line in f.readlines()]
134
        for group in groups.split(" "):
135
            self.assertEqual(sorted(group), sorted(events[:len(group)]))
136
            events = events[len(group):]
137
        self.assertEqual(events, [])
138
139
    def assertEventsIn(self, expected_events):
140
        with open(self.__events_file) as f:
141
            events = [line.strip() for line in f.readlines()]
142
        self.assertIn(events, expected_events)
143
144
145
# class TestHooks(Hooks):
146
#     def action_pending(self, time, action):
147
#         print("action_pending", time, action.label)
148
149
#     def action_ready(self, time, action):
150
#         print("action_ready", time, action.label)
151
152
#     def action_canceled(self, time, action):
153
#         print("action_canceled", time, action.label)
154
155
#     def action_started(self, time, action):
156
#         print("action_started", time, action.label)
157
158
#     def action_printed(self, time, action, text):
159
#         print("action_printed", time, action.label, text)
160
161
#     def action_successful(self, time, action, return_value):
162
#         print("action_successful", time, action.label, return_value)
163
164
#     def action_failed(self, time, action, exception):
165
#         print("action_failed", time, action.label, exception)
166