aiogremlin.driver.connection.Connection.__init__()   A
last analyzed

Complexity

Conditions 2

Size

Total Lines 18
Code Lines 18

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 18
dl 0
loc 18
rs 9.5
c 0
b 0
f 0
cc 2
nop 11

How to fix   Many Parameters   

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
import asyncio
2
import logging
3
import uuid
4
5
try:
6
    import ujson as json
7
except ImportError:
8
    import json
9
10
from aiogremlin.driver import provider, resultset
11
from aiogremlin.driver.protocol import GremlinServerWSProtocol
12
from aiogremlin.driver.aiohttp.transport import AiohttpTransport
13
from gremlin_python.driver import serializer
14
15
16
logger = logging.getLogger(__name__)
17
18
19
class Connection:
20
    """
21
    Main classd for interacting with the Gremlin Server. Encapsulates a
22
    websocket connection. Not instantiated directly. Instead use
23
    :py:meth::`Connection.open<aiogremlin.driver.connection.Connection.open>`.
24
25
    :param str url: url for host Gremlin Server
26
    :param gremlin_python.driver.transport.AbstractBaseTransport transport:
27
        Transport implementation
28
    :param gremlin_python.driver.protocol.AbstractBaseProtocol protocol:
29
        Protocol implementation
30
    :param asyncio.BaseEventLoop loop:
31
    :param str username: Username for database auth
32
    :param str password: Password for database auth
33
    :param int max_inflight: Maximum number of unprocessed requests at any
34
        one time on the connection
35
    :param float response_timeout: (optional) `None` by default
36
    """
37
    def __init__(self, url, transport, protocol, loop, username, password,
38
                 max_inflight, response_timeout, message_serializer, provider):
39
        self._url = url
40
        self._transport = transport
41
        self._protocol = protocol
42
        self._loop = loop
43
        self._response_timeout = response_timeout
44
        self._username = username
45
        self._password = password
46
        self._closed = False
47
        self._result_sets = {}
48
        self._receive_task = self._loop.create_task(self._receive())
49
        self._semaphore = asyncio.Semaphore(value=max_inflight,
50
                                            loop=self._loop)
51
        if isinstance(message_serializer, type):
52
            message_serializer = message_serializer()
53
        self._message_serializer = message_serializer
54
        self._provider = provider
55
56
    @classmethod
57
    async def open(cls, url, loop, *,
58
                   protocol=None,
59
                   transport_factory=None,
60
                   ssl_context=None,
61
                   username='',
62
                   password='',
63
                   max_inflight=64,
64
                   response_timeout=None,
65
                   message_serializer=serializer.GraphSONMessageSerializer,
66
                   provider=provider.TinkerGraph):
67
        """
68
        **coroutine** Open a connection to the Gremlin Server.
69
70
        :param str url: url for host Gremlin Server
71
        :param asyncio.BaseEventLoop loop:
72
        :param gremlin_python.driver.protocol.AbstractBaseProtocol protocol:
73
            Protocol implementation
74
        :param transport_factory: Factory function for transports
75
        :param ssl.SSLContext ssl_context:
76
        :param str username: Username for database auth
77
        :param str password: Password for database auth
78
79
        :param int max_inflight: Maximum number of unprocessed requests at any
80
            one time on the connection
81
        :param float response_timeout: (optional) `None` by default
82
        :param message_serializer: Message serializer implementation
83
        :param provider: Graph provider object implementation
84
85
        :returns: :py:class:`Connection<aiogremlin.driver.connection.Connection>`
86
        """
87
        if not protocol:
88
            protocol = GremlinServerWSProtocol(message_serializer)
89
        if not transport_factory:
90
            transport_factory = lambda: AiohttpTransport(loop)
91
        transport = transport_factory()
92
        await transport.connect(url, ssl_context=ssl_context)
93
        return cls(url, transport, protocol, loop, username, password,
94
                   max_inflight, response_timeout, message_serializer,
95
                   provider)
96
97
    @property
98
    def message_serializer(self):
99
        return self._message_serializer
100
101
    @property
102
    def closed(self):
103
        """
104
        Read-only property. Check if connection has been closed.
105
106
        :returns: `bool`
107
        """
108
        return self._closed or self._transport.closed
109
110
    @property
111
    def url(self):
112
        """
113
        Readonly property.
114
115
        :returns: str The url association with this connection.
116
        """
117
        return self._url
118
119
    async def write(self, message):
120
        """
121
        Submit a script and bindings to the Gremlin Server
122
123
        :param `RequestMessage<gremlin_python.driver.request.RequestMessage>` message:
124
        :returns: :py:class:`ResultSet<aiogremlin.driver.resultset.ResultSet>`
125
            object
126
        """
127
        await self._semaphore.acquire()
128
        request_id = str(uuid.uuid4())
129
        message = self._message_serializer.serialize_message(
130
            request_id, message)
131
        if self._transport.closed:
132
            await self._transport.connect(self.url)
133
        func = self._transport.write(message)
134
        if asyncio.iscoroutine(func):
135
            await func
136
        result_set = resultset.ResultSet(request_id, self._response_timeout,
137
                                   self._loop)
138
        self._result_sets[request_id] = result_set
139
        self._loop.create_task(
140
            self._terminate_response(result_set, request_id))
141
        return result_set
142
143
    submit = write
144
145
    async def close(self):
146
        """**coroutine** Close underlying connection and mark as closed."""
147
        self._receive_task.cancel()
148
        await self._transport.close()
149
        self._closed = True
150
151
    async def _terminate_response(self, resp, request_id):
152
        await resp.done.wait()
153
        del self._result_sets[request_id]
154
        self._semaphore.release()
155
156
    async def _receive(self):
157
        while True:
158
            data = await self._transport.read()
159
            await self._protocol.data_received(data, self._result_sets)
160
161
    async def __aenter__(self):
162
        return self
163
164
    async def __aexit__(self, exc_type, exc, tb):
165
        await self.close()
166
        self._transport = None
167