1 | """Async TCP Server tests.""" |
||
2 | |||
3 | import asyncio |
||
4 | import errno |
||
5 | import logging |
||
6 | |||
7 | from kytos.core.atcp_server import (KytosServer, KytosServerProtocol, |
||
8 | exception_handler) |
||
9 | |||
10 | # Using "nettest" TCP port as a way to avoid conflict with a running |
||
11 | # Kytos server on 6653. |
||
12 | TEST_ADDRESS = ('127.0.0.1', 4138) |
||
13 | |||
14 | |||
15 | class TestKytosServer: |
||
16 | """Test if a Kytos Server will go up and receive connections.""" |
||
17 | |||
18 | def setup(self): |
||
19 | """Start new asyncio loop and a test TCP server.""" |
||
20 | # pylint: disable=attribute-defined-outside-init |
||
21 | self.loop = asyncio.new_event_loop() |
||
22 | asyncio.set_event_loop(None) |
||
23 | self.server = KytosServer(TEST_ADDRESS, KytosServerProtocol, |
||
24 | None, 'openflow', loop=self.loop) |
||
25 | self.server.serve_forever() |
||
26 | |||
27 | def test_connection_to_server(self): |
||
28 | """Test if we really can connect to TEST_ADDRESS.""" |
||
29 | @asyncio.coroutine |
||
30 | def wait_and_go(): |
||
31 | """Wait a little for the server to go up, then connect.""" |
||
32 | yield from asyncio.sleep(0.01, loop=self.loop) |
||
33 | # reader, writer = ... |
||
34 | _ = yield from asyncio.open_connection( |
||
35 | *TEST_ADDRESS, loop=self.loop) |
||
36 | |||
37 | self.loop.run_until_complete(wait_and_go()) |
||
38 | |||
39 | def test_exception_handler_oserror(self, caplog): |
||
40 | """Test the TCP Server Exception Handler. |
||
41 | |||
42 | 1. create mock OSError/TimeoutError instances |
||
43 | 2. call exception_handler with them |
||
44 | 3. ensure log is OK |
||
45 | """ |
||
46 | caplog.set_level(logging.INFO) |
||
47 | |||
48 | exception = TimeoutError() |
||
0 ignored issues
–
show
Comprehensibility
Best Practice
introduced
by
Loading history...
|
|||
49 | context = {"exception": exception, |
||
50 | "transport": "unit_tests"} |
||
51 | exception_handler(self.loop, context) |
||
52 | |||
53 | exception2 = OSError(errno.EBADF, "Bad file descriptor") |
||
54 | context2 = {"exception": exception2, |
||
55 | "transport": "unit_tests"} |
||
56 | exception_handler(self.loop, context2) |
||
57 | |||
58 | assert caplog.record_tuples == [ |
||
59 | ("atcp_server", logging.INFO, "Socket timeout: 'unit_tests'"), |
||
60 | ("atcp_server", logging.INFO, "Socket closed: 'unit_tests'"), |
||
61 | ] |
||
62 |