Passed
Push — master ( b99802...4a7861 )
by Beraldo
05:16 queued 02:47
created

tests/helper.py (1 issue)

1
"""Module with some helpers for tests."""
2
import os
3
import sys
4
import threading
5
import time
6
from socket import socket
7
8
from pyof.v0x01.common.header import Header
9
from pyof.v0x01.symmetric.hello import Hello
10
11
from kytos.core import Controller
12
from kytos.core.config import KytosConfig
13
14
__all__ = ('do_handshake', 'new_controller', 'new_client',
15
           'new_handshaked_client')
16
17
18
def do_handshake(client: socket):
19
    """Get a client socket and do the handshake of it.
20
21
    This method receives a client socket that simulates a switch on the
22
    network and does the OpenFlow handshake process with a running controller
23
    on the network.
24
25
    Args:
26
        client (socket): a socket object connected to the controller.
27
28
    Returns:
29
        The client with the handshake process done.
30
31
    """
32
    # -- STEP 1: Send Hello message
33
    client.send(Hello(xid=3).pack())
34
35
    # -- STEP 2: Wait for Hello response
36
    binary_packet = b''
37
    while len(binary_packet) < 8:
38
        binary_packet = client.recv(8)
39
    header = Header()
40
    header.unpack(binary_packet)
41
42
    # -- STEP 3: Wait for features_request message
43
    binary_packet = b''
44
    # len() < 8 here because we just expect a Hello as response
45
    while len(binary_packet) < 8:
46
        binary_packet = client.recv(8)
47
    header = Header()
48
    header.unpack(binary_packet)
49
50
    # -- STEP 4: Send features_reply to the controller
51
    basedir = os.path.dirname(os.path.abspath(__file__))
52
    raw_dir = os.path.join(basedir, 'raw')
53
    message = None
54
    with open(os.path.join(raw_dir, 'features_reply.cap'), 'rb') as file:
55
        message = file.read()
56
    client.send(message)
57
58
    return client
59
60
61
def get_config():
62
    """Exclude unittest args from Config argparser."""
63
    argv_backup = None
64
    # If cli command was like "python -m unittest"
65
    if sys.argv[0].split()[-1] == 'unittest':
66
        argv_backup = sys.argv
67
        sys.argv = sys.argv[:1]
68
    config = KytosConfig()
69
    if argv_backup:
70
        # Recover original argv
71
        sys.argv = argv_backup
72
    return config
73
74
75
def new_controller(options=None):
76
    """Instantiate a Kytos Controller.
77
78
    Args:
79
        options (KytosConfig.options): options generated by KytosConfig
80
81
    Returns:
82
        controller: Running Controler
83
84
    """
85
    if options is None:
86
        options = get_config().options['daemon']
87
    controller = Controller(options)
88
    controller.start()
89
    time.sleep(0.1)
90
    return controller
91
92
93
def new_client(options=None):
94
    """Create and returns a socket client.
95
96
    Args:
97
        options (KytosConfig.options): options generated by KytosConfig
98
99
    Returns:
100
        client (socket): Client connected to the Kytos controller before
101
            handshake
102
103
    """
104
    if options is None:
105
        options = get_config().options['daemon']
106
    client = socket()
107
    client.connect((options.listen, options.port))
108
    return client
109
110
111
def new_handshaked_client(options=None):
112
    """Create and returns a socket client.
113
114
    Args:
115
        options (KytosConfig.options): options generated by KytosConfig
116
117
    Returns:
118
        client (socket): Client connected to the Kytos controller with
119
            handshake done
120
121
    """
122
    if options is None:
123
        options = get_config().options['daemon']
124
    client = new_client(options)
125
    return do_handshake(client)
126
127
128
def test_concurrently(times):
129
    """
130
    Decorator to test concurrently many times.
131
132
    Add this decorator to small pieces of code that you want to test
133
    concurrently to make sure they don't raise exceptions when run at the
134
    same time.
135
136
    E.g., some views that do a SELECT and then a subsequent
137
    INSERT might fail when the INSERT assumes that the data has not changed
138
    since the SELECT.
139
    E.g.:
140
        def test_method(self):
141
        from tests.helpers import test_concurrently
142
        @test_concurrently(10)
143
        def toggle_call_your_method_here():
144
            print('This is a test')
145
146
        toggle_call_your_method_here()
147
    """
148
    def test_concurrently_decorator(test_func):
149
        """Decorator thread execution."""
150
        def wrapper(*args, **kwargs):
151
            """Thread wrapper."""
152
            exceptions = []
153
154
            def call_test_func():
155
                """Call the test method."""
156
                try:
157
                    test_func(*args, **kwargs)
158
                except Exception as _e:
159
                    exceptions.append(_e)
160
                    raise
161
            threads = []
162
            for _ in range(times):
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable range does not seem to be defined.
Loading history...
163
                threads.append(threading.Thread(target=call_test_func))
164
            for thread in threads:
165
                thread.start()
166
            for thread in threads:
167
                thread.join()
168
            if exceptions:
169
                raise Exception('test_concurrently intercepted '
170
                                '%s exceptions: %s' %
171
                                (len(exceptions), exceptions))
172
        return wrapper
173
    return test_concurrently_decorator
174