tests.helper.get_config()   A
last analyzed

Complexity

Conditions 3

Size

Total Lines 12
Code Lines 9

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 9.3211

Importance

Changes 0
Metric Value
cc 3
eloc 9
nop 0
dl 0
loc 12
ccs 1
cts 9
cp 0.1111
crap 9.3211
rs 9.95
c 0
b 0
f 0
1
"""Module with some helpers for tests."""
2 1
import os
3 1
import sys
4 1
import threading
5 1
import time
6 1
from socket import socket
7
8 1
from pyof.v0x01.common.header import Header
9 1
from pyof.v0x01.symmetric.hello import Hello
10
11 1
from kytos.core import Controller
12 1
from kytos.core.config import KytosConfig
13
14 1
__all__ = ('do_handshake', 'new_controller', 'new_client',
15
           'new_handshaked_client')
16
17
18 1
def do_handshake(client: socket):
19
    """Get a client socket and do the handshake of it.
20
21
    This method receives a client socket that simulates a switch on the
22
    network and does the OpenFlow handshake process with a running controller
23
    on the network.
24
25
    Args:
26
        client (socket): a socket object connected to the controller.
27
28
    Returns:
29
        The client with the handshake process done.
30
31
    """
32
    # -- STEP 1: Send Hello message
33
    client.send(Hello(xid=3).pack())
34
35
    # -- STEP 2: Wait for Hello response
36
    binary_packet = b''
37
    while len(binary_packet) < 8:
38
        binary_packet = client.recv(8)
39
    header = Header()
40
    header.unpack(binary_packet)
41
42
    # -- STEP 3: Wait for features_request message
43
    binary_packet = b''
44
    # len() < 8 here because we just expect a Hello as response
45
    while len(binary_packet) < 8:
46
        binary_packet = client.recv(8)
47
    header = Header()
48
    header.unpack(binary_packet)
49
50
    # -- STEP 4: Send features_reply to the controller
51
    basedir = os.path.dirname(os.path.abspath(__file__))
52
    raw_dir = os.path.join(basedir, 'raw')
53
    message = None
54
    with open(os.path.join(raw_dir, 'features_reply.cap'), 'rb') as file:
55
        message = file.read()
56
    client.send(message)
57
58
    return client
59
60
61 1
def get_config():
62
    """Exclude unittest args from Config argparser."""
63
    argv_backup = None
64
    # If cli command was like "python -m unittest"
65
    if sys.argv[0].split()[-1] == 'unittest':
66
        argv_backup = sys.argv
67
        sys.argv = sys.argv[:1]
68
    config = KytosConfig()
69
    if argv_backup:
70
        # Recover original argv
71
        sys.argv = argv_backup
72
    return config
73
74
75 1
def new_controller(options=None):
76
    """Instantiate a Kytos Controller.
77
78
    Args:
79
        options (KytosConfig.options): options generated by KytosConfig
80
81
    Returns:
82
        controller: Running Controler
83
84
    """
85
    if options is None:
86
        options = get_config().options['daemon']
87
    controller = Controller(options)
88
    controller.start()
89
    time.sleep(0.1)
90
    return controller
91
92
93 1
def new_client(options=None):
94
    """Create and returns a socket client.
95
96
    Args:
97
        options (KytosConfig.options): options generated by KytosConfig
98
99
    Returns:
100
        client (socket): Client connected to the Kytos controller before
101
            handshake
102
103
    """
104
    if options is None:
105
        options = get_config().options['daemon']
106
    client = socket()
107
    client.connect((options.listen, options.port))
108
    return client
109
110
111 1
def new_handshaked_client(options=None):
112
    """Create and returns a socket client.
113
114
    Args:
115
        options (KytosConfig.options): options generated by KytosConfig
116
117
    Returns:
118
        client (socket): Client connected to the Kytos controller with
119
            handshake done
120
121
    """
122
    if options is None:
123
        options = get_config().options['daemon']
124
    client = new_client(options)
125
    return do_handshake(client)
126
127
128
# pylint: disable=broad-exception-raised
129 1
def test_concurrently(times):
130
    """
131
    Decorator to test concurrently many times.
132
133
    Add this decorator to small pieces of code that you want to test
134
    concurrently to make sure they don't raise exceptions when run at the
135
    same time.
136
137
    E.g., some views that do a SELECT and then a subsequent
138
    INSERT might fail when the INSERT assumes that the data has not changed
139
    since the SELECT.
140
    E.g.:
141
        def test_method(self):
142
        from tests.helpers import test_concurrently
143
        @test_concurrently(10)
144
        def toggle_call_your_method_here():
145
            print('This is a test')
146
147
        toggle_call_your_method_here()
148
    """
149 1
    def test_concurrently_decorator(test_func):
150
        """Decorator thread execution."""
151 1
        def wrapper(*args, **kwargs):
152
            """Thread wrapper."""
153 1
            exceptions = []
154
155 1
            def call_test_func():
156
                """Call the test method."""
157 1
                try:
158 1
                    test_func(*args, **kwargs)
159
                except Exception as _e:
160
                    exceptions.append(_e)
161
                    raise
162 1
            threads = []
163 1
            for _ in range(times):
164 1
                threads.append(threading.Thread(target=call_test_func))
165 1
            for thread in threads:
166 1
                thread.start()
167 1
            for thread in threads:
168 1
                thread.join()
169 1
            if exceptions:
170
                raise Exception("test_concurrently intercepted "
171
                                f"{len(exceptions)} exceptions: {exceptions}")
172 1
        return wrapper
173
    return test_concurrently_decorator
174