aiogremlin.driver.connection   A
last analyzed

Complexity

Total Complexity 18

Size/Duplication

Total Lines 167
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 18
eloc 95
dl 0
loc 167
rs 10
c 0
b 0
f 0

11 Methods

Rating   Name   Duplication   Size   Complexity  
A Connection.closed() 0 8 1
A Connection.url() 0 8 1
A Connection.message_serializer() 0 3 1
A Connection.__aexit__() 0 3 1
A Connection.write() 0 23 3
A Connection._receive() 0 4 2
A Connection._terminate_response() 0 4 1
A Connection.open() 0 40 4
A Connection.__aenter__() 0 2 1
A Connection.__init__() 0 18 2
A Connection.close() 0 5 1
1
import asyncio
2
import logging
3
import uuid
4
5
try:
6
    import ujson as json
7
except ImportError:
8
    import json
9
10
from aiogremlin.driver import provider, resultset
11
from aiogremlin.driver.protocol import GremlinServerWSProtocol
12
from aiogremlin.driver.aiohttp.transport import AiohttpTransport
13
from gremlin_python.driver import serializer
14
15
16
logger = logging.getLogger(__name__)
17
18
19
class Connection:
20
    """
21
    Main classd for interacting with the Gremlin Server. Encapsulates a
22
    websocket connection. Not instantiated directly. Instead use
23
    :py:meth::`Connection.open<aiogremlin.driver.connection.Connection.open>`.
24
25
    :param str url: url for host Gremlin Server
26
    :param gremlin_python.driver.transport.AbstractBaseTransport transport:
27
        Transport implementation
28
    :param gremlin_python.driver.protocol.AbstractBaseProtocol protocol:
29
        Protocol implementation
30
    :param asyncio.BaseEventLoop loop:
31
    :param str username: Username for database auth
32
    :param str password: Password for database auth
33
    :param int max_inflight: Maximum number of unprocessed requests at any
34
        one time on the connection
35
    :param float response_timeout: (optional) `None` by default
36
    """
37
    def __init__(self, url, transport, protocol, loop, username, password,
38
                 max_inflight, response_timeout, message_serializer, provider):
39
        self._url = url
40
        self._transport = transport
41
        self._protocol = protocol
42
        self._loop = loop
43
        self._response_timeout = response_timeout
44
        self._username = username
45
        self._password = password
46
        self._closed = False
47
        self._result_sets = {}
48
        self._receive_task = self._loop.create_task(self._receive())
49
        self._semaphore = asyncio.Semaphore(value=max_inflight,
50
                                            loop=self._loop)
51
        if isinstance(message_serializer, type):
52
            message_serializer = message_serializer()
53
        self._message_serializer = message_serializer
54
        self._provider = provider
55
56
    @classmethod
57
    async def open(cls, url, loop, *,
58
                   protocol=None,
59
                   transport_factory=None,
60
                   ssl_context=None,
61
                   username='',
62
                   password='',
63
                   max_inflight=64,
64
                   response_timeout=None,
65
                   message_serializer=serializer.GraphSONMessageSerializer,
66
                   provider=provider.TinkerGraph):
67
        """
68
        **coroutine** Open a connection to the Gremlin Server.
69
70
        :param str url: url for host Gremlin Server
71
        :param asyncio.BaseEventLoop loop:
72
        :param gremlin_python.driver.protocol.AbstractBaseProtocol protocol:
73
            Protocol implementation
74
        :param transport_factory: Factory function for transports
75
        :param ssl.SSLContext ssl_context:
76
        :param str username: Username for database auth
77
        :param str password: Password for database auth
78
79
        :param int max_inflight: Maximum number of unprocessed requests at any
80
            one time on the connection
81
        :param float response_timeout: (optional) `None` by default
82
        :param message_serializer: Message serializer implementation
83
        :param provider: Graph provider object implementation
84
85
        :returns: :py:class:`Connection<aiogremlin.driver.connection.Connection>`
86
        """
87
        if not protocol:
88
            protocol = GremlinServerWSProtocol(message_serializer)
89
        if not transport_factory:
90
            transport_factory = lambda: AiohttpTransport(loop)
91
        transport = transport_factory()
92
        await transport.connect(url, ssl_context=ssl_context)
93
        return cls(url, transport, protocol, loop, username, password,
94
                   max_inflight, response_timeout, message_serializer,
95
                   provider)
96
97
    @property
98
    def message_serializer(self):
99
        return self._message_serializer
100
101
    @property
102
    def closed(self):
103
        """
104
        Read-only property. Check if connection has been closed.
105
106
        :returns: `bool`
107
        """
108
        return self._closed or self._transport.closed
109
110
    @property
111
    def url(self):
112
        """
113
        Readonly property.
114
115
        :returns: str The url association with this connection.
116
        """
117
        return self._url
118
119
    async def write(self, message):
120
        """
121
        Submit a script and bindings to the Gremlin Server
122
123
        :param `RequestMessage<gremlin_python.driver.request.RequestMessage>` message:
124
        :returns: :py:class:`ResultSet<aiogremlin.driver.resultset.ResultSet>`
125
            object
126
        """
127
        await self._semaphore.acquire()
128
        request_id = str(uuid.uuid4())
129
        message = self._message_serializer.serialize_message(
130
            request_id, message)
131
        if self._transport.closed:
132
            await self._transport.connect(self.url)
133
        func = self._transport.write(message)
134
        if asyncio.iscoroutine(func):
135
            await func
136
        result_set = resultset.ResultSet(request_id, self._response_timeout,
137
                                   self._loop)
138
        self._result_sets[request_id] = result_set
139
        self._loop.create_task(
140
            self._terminate_response(result_set, request_id))
141
        return result_set
142
143
    submit = write
144
145
    async def close(self):
146
        """**coroutine** Close underlying connection and mark as closed."""
147
        self._receive_task.cancel()
148
        await self._transport.close()
149
        self._closed = True
150
151
    async def _terminate_response(self, resp, request_id):
152
        await resp.done.wait()
153
        del self._result_sets[request_id]
154
        self._semaphore.release()
155
156
    async def _receive(self):
157
        while True:
158
            data = await self._transport.read()
159
            await self._protocol.data_received(data, self._result_sets)
160
161
    async def __aenter__(self):
162
        return self
163
164
    async def __aexit__(self, exc_type, exc, tb):
165
        await self.close()
166
        self._transport = None
167