test_connection   A
last analyzed

Complexity

Total Complexity 25

Size/Duplication

Total Lines 121
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 95
dl 0
loc 121
rs 10
c 0
b 0
f 0
wmc 25

9 Functions

Rating   Name   Duplication   Size   Complexity  
A test_204_empty_stream() 0 14 3
A test_connection_response_timeout() 0 12 4
A test_get_close_conn() 0 8 1
A test_cant_connect() 0 4 2
A test_conn_context_manager() 0 5 2
A test_submit() 0 13 3
A test_resp_queue_removed_from_conn() 0 13 3
A test_stream_done() 0 11 3
A test_server_error() 0 11 4
1
import asyncio
2
import base64
3
import json
4
5
import aiohttp
6
import pytest
7
from aiogremlin import exception
8
from aiohttp import web
9
from gremlin_python.driver import request
10
11
from goblin import driver, provider
12
13
14
@pytest.mark.asyncio
15
async def test_get_close_conn(connection):
16
    ws = connection._transport
17
    assert not ws.closed
18
    assert not connection.closed
19
    await connection.close()
20
    assert connection.closed
21
    assert ws.closed
22
23
24
@pytest.mark.asyncio
25
async def test_conn_context_manager(connection):
26
    async with connection:
27
        assert not connection.closed
28
    assert connection.closed
29
30
31
@pytest.mark.asyncio
32
async def test_submit(connection):
33
    async with connection:
34
        message = request.RequestMessage(
35
            processor='', op='eval', args={
36
                'gremlin': '1 + 1'
37
            })
38
        stream = await connection.write(message)
39
        results = []
40
        async for msg in stream:
41
            results.append(msg)
42
        assert len(results) == 1
43
        assert results[0] == 2
44
45
46
@pytest.mark.asyncio
47
async def test_204_empty_stream(connection, aliases):
48
    resp = False
49
    async with connection:
50
        message = request.RequestMessage(
51
            processor='',
52
            op='eval',
53
            args={
54
                'gremlin': 'g.V().has("unlikely", "even less likely")'
55
            })
56
        stream = await connection.write(message)
57
        async for msg in stream:
58
            resp = True
59
    assert not resp
60
61
62
@pytest.mark.asyncio
63
async def test_server_error(connection):
64
    async with connection:
65
        message = request.RequestMessage(
66
            processor='', op='eval', args={
67
                'gremlin': 'g. V jla;sdf'
68
            })
69
        with pytest.raises(exception.GremlinServerError):
70
            stream = await connection.write(message)
71
            async for msg in stream:
72
                pass
73
74
75
@pytest.mark.asyncio
76
async def test_cant_connect(event_loop, gremlin_server, unused_server_url):
77
    with pytest.raises(Exception):
78
        await gremlin_server.get_connection(unused_server_url, event_loop)
79
80
81
@pytest.mark.asyncio
82
async def test_resp_queue_removed_from_conn(connection):
83
    async with connection:
84
        message = request.RequestMessage(
85
            processor='', op='eval', args={
86
                'gremlin': '1 + 1'
87
            })
88
        stream = await connection.write(message)
89
        async for msg in stream:
90
            pass
91
        await asyncio.sleep(0)
92
        assert stream._response_queue not in list(
93
            connection._result_sets.values())
94
95
96
@pytest.mark.asyncio
97
async def test_stream_done(connection):
98
    async with connection:
99
        message = request.RequestMessage(
100
            processor='', op='eval', args={
101
                'gremlin': '1 + 1'
102
            })
103
        stream = await connection.write(message)
104
        async for msg in stream:
105
            pass
106
        assert stream.done
107
108
109
@pytest.mark.asyncio
110
async def test_connection_response_timeout(connection):
111
    async with connection:
112
        message = request.RequestMessage(
113
            processor='', op='eval', args={
114
                'gremlin': '1 + 1'
115
            })
116
        connection._response_timeout = 0.0000001
117
        with pytest.raises(exception.ResponseTimeoutError):
118
            stream = await connection.write(message)
119
            async for msg in stream:
120
                pass
121
122
123
# @pytest.mark.asyncio
124
# async def test_authenticated_connection(event_loop, unused_tcp_port):
125
#     authentication_request_queue = asyncio.Queue(loop=event_loop)
126
#
127
#     username, password = 'test_username', 'test_password'
128
#
129
#     async def fake_auth(request):
130
#         ws = web.WebSocketResponse()
131
#         await ws.prepare(request)
132
#
133
#         msg = await ws.receive()
134
#         data = json.loads(msg.data.decode()[17:])
135
#         await authentication_request_queue.put(data)
136
#
137
#         auth_resp = {
138
#             "requestId": data["requestId"],
139
#             "status": {"code": 407, "attributes": {}, "message": ""},
140
#             "result": {"data": None, "meta": {}}
141
#         }
142
#         resp_payload = json.dumps(auth_resp)
143
#         ws.send_str(resp_payload)
144
#
145
#         auth_msg = await ws.receive()
146
#         auth_msg_data = json.loads(auth_msg.data.decode()[17:])
147
#         await authentication_request_queue.put(auth_msg_data)
148
#
149
#         return ws
150
#
151
#     aiohttp_app = web.Application(loop=event_loop)
152
#     aiohttp_app.router.add_route('GET', '/gremlin', fake_auth)
153
#     handler = aiohttp_app.make_handler()
154
#     srv = await event_loop.create_server(handler, '0.0.0.0',
155
#                                          unused_tcp_port)
156
#
157
#     async with aiohttp.ClientSession(loop=event_loop) as session:
158
#         url = 'ws://0.0.0.0:{}/gremlin'.format(unused_tcp_port)
159
#         async with session.ws_connect(url) as ws_client:
160
#             connection = driver.Connection(
161
#                 url=url, ws=ws_client, loop=event_loop,
162
#                 client_session=session,
163
#                 username=username, password=password, max_inflight=64,
164
#                 response_timeout=None,
165
#                 message_serializer=driver.GraphSONMessageSerializer,
166
#                 provider=provider.TinkerGraph
167
#             )
168
#             message = request.RequestMessage(
169
#                 processor='', op='eval',
170
#                 args={'gremlin': '1 + 1'})
171
#             task = event_loop.create_task(connection.write(message))
172
#             initial_request = await authentication_request_queue.get()
173
#             auth_request = await authentication_request_queue.get()
174
#             print(auth_request)
175
#             auth_str = auth_request['args']['sasl']
176
#             assert base64.b64decode(auth_str).decode().split(
177
#                 '\x00')[1:] == [username, password]
178
#             assert auth_request['requestId'] == initial_request['requestId']
179
#             resp = await task
180
#             resp.close()
181
#
182
#             await connection.close()
183