1 | """Module with some helpers for tests.""" |
||
2 | import os |
||
3 | import sys |
||
4 | import threading |
||
5 | import time |
||
6 | from socket import socket |
||
7 | |||
8 | from pyof.v0x01.common.header import Header |
||
9 | from pyof.v0x01.symmetric.hello import Hello |
||
10 | |||
11 | from kytos.core import Controller |
||
12 | from kytos.core.config import KytosConfig |
||
13 | |||
14 | __all__ = ('do_handshake', 'new_controller', 'new_client', |
||
15 | 'new_handshaked_client') |
||
16 | |||
17 | |||
18 | def do_handshake(client: socket): |
||
19 | """Get a client socket and do the handshake of it. |
||
20 | |||
21 | This method receives a client socket that simulates a switch on the |
||
22 | network and does the OpenFlow handshake process with a running controller |
||
23 | on the network. |
||
24 | |||
25 | Args: |
||
26 | client (socket): a socket object connected to the controller. |
||
27 | |||
28 | Returns: |
||
29 | The client with the handshake process done. |
||
30 | |||
31 | """ |
||
32 | # -- STEP 1: Send Hello message |
||
33 | client.send(Hello(xid=3).pack()) |
||
34 | |||
35 | # -- STEP 2: Wait for Hello response |
||
36 | binary_packet = b'' |
||
37 | while len(binary_packet) < 8: |
||
38 | binary_packet = client.recv(8) |
||
39 | header = Header() |
||
40 | header.unpack(binary_packet) |
||
41 | |||
42 | # -- STEP 3: Wait for features_request message |
||
43 | binary_packet = b'' |
||
44 | # len() < 8 here because we just expect a Hello as response |
||
45 | while len(binary_packet) < 8: |
||
46 | binary_packet = client.recv(8) |
||
47 | header = Header() |
||
48 | header.unpack(binary_packet) |
||
49 | |||
50 | # -- STEP 4: Send features_reply to the controller |
||
51 | basedir = os.path.dirname(os.path.abspath(__file__)) |
||
52 | raw_dir = os.path.join(basedir, 'raw') |
||
53 | message = None |
||
54 | with open(os.path.join(raw_dir, 'features_reply.cap'), 'rb') as file: |
||
55 | message = file.read() |
||
56 | client.send(message) |
||
57 | |||
58 | return client |
||
59 | |||
60 | |||
61 | def get_config(): |
||
62 | """Exclude unittest args from Config argparser.""" |
||
63 | argv_backup = None |
||
64 | # If cli command was like "python -m unittest" |
||
65 | if sys.argv[0].split()[-1] == 'unittest': |
||
66 | argv_backup = sys.argv |
||
67 | sys.argv = sys.argv[:1] |
||
68 | config = KytosConfig() |
||
69 | if argv_backup: |
||
70 | # Recover original argv |
||
71 | sys.argv = argv_backup |
||
72 | return config |
||
73 | |||
74 | |||
75 | def new_controller(options=None): |
||
76 | """Instantiate a Kytos Controller. |
||
77 | |||
78 | Args: |
||
79 | options (KytosConfig.options): options generated by KytosConfig |
||
80 | |||
81 | Returns: |
||
82 | controller: Running Controler |
||
83 | |||
84 | """ |
||
85 | if options is None: |
||
86 | options = get_config().options['daemon'] |
||
87 | controller = Controller(options) |
||
88 | controller.start() |
||
89 | time.sleep(0.1) |
||
90 | return controller |
||
91 | |||
92 | |||
93 | def new_client(options=None): |
||
94 | """Create and returns a socket client. |
||
95 | |||
96 | Args: |
||
97 | options (KytosConfig.options): options generated by KytosConfig |
||
98 | |||
99 | Returns: |
||
100 | client (socket): Client connected to the Kytos controller before |
||
101 | handshake |
||
102 | |||
103 | """ |
||
104 | if options is None: |
||
105 | options = get_config().options['daemon'] |
||
106 | client = socket() |
||
107 | client.connect((options.listen, options.port)) |
||
108 | return client |
||
109 | |||
110 | |||
111 | def new_handshaked_client(options=None): |
||
112 | """Create and returns a socket client. |
||
113 | |||
114 | Args: |
||
115 | options (KytosConfig.options): options generated by KytosConfig |
||
116 | |||
117 | Returns: |
||
118 | client (socket): Client connected to the Kytos controller with |
||
119 | handshake done |
||
120 | |||
121 | """ |
||
122 | if options is None: |
||
123 | options = get_config().options['daemon'] |
||
124 | client = new_client(options) |
||
125 | return do_handshake(client) |
||
126 | |||
127 | |||
128 | def test_concurrently(times): |
||
129 | """ |
||
130 | Decorator to test concurrently many times. |
||
131 | |||
132 | Add this decorator to small pieces of code that you want to test |
||
133 | concurrently to make sure they don't raise exceptions when run at the |
||
134 | same time. |
||
135 | |||
136 | E.g., some views that do a SELECT and then a subsequent |
||
137 | INSERT might fail when the INSERT assumes that the data has not changed |
||
138 | since the SELECT. |
||
139 | E.g.: |
||
140 | def test_method(self): |
||
141 | from tests.helpers import test_concurrently |
||
142 | @test_concurrently(10) |
||
143 | def toggle_call_your_method_here(): |
||
144 | print('This is a test') |
||
145 | |||
146 | toggle_call_your_method_here() |
||
147 | """ |
||
148 | def test_concurrently_decorator(test_func): |
||
149 | """Decorator thread execution.""" |
||
150 | def wrapper(*args, **kwargs): |
||
151 | """Thread wrapper.""" |
||
152 | exceptions = [] |
||
153 | |||
154 | def call_test_func(): |
||
155 | """Call the test method.""" |
||
156 | try: |
||
157 | test_func(*args, **kwargs) |
||
158 | except Exception as _e: |
||
159 | exceptions.append(_e) |
||
160 | raise |
||
161 | threads = [] |
||
162 | for _ in range(times): |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
Loading history...
|
|||
163 | threads.append(threading.Thread(target=call_test_func)) |
||
164 | for thread in threads: |
||
165 | thread.start() |
||
166 | for thread in threads: |
||
167 | thread.join() |
||
168 | if exceptions: |
||
169 | raise Exception('test_concurrently intercepted ' |
||
170 | '%s exceptions: %s' % |
||
171 | (len(exceptions), exceptions)) |
||
172 | return wrapper |
||
173 | return test_concurrently_decorator |
||
174 |