aiogremlin.driver.pool.ConnectionPool.__init__()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 18
Code Lines 18

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 18
dl 0
loc 18
rs 9.5
c 0
b 0
f 0
cc 1
nop 13

How to fix   Many Parameters   

Many Parameters

Methods with many parameters are not only hard to understand, but their parameters also often become inconsistent when you need more, or different data.

There are several approaches to avoid long parameter lists:

1
import asyncio
2
import collections
3
4
import aiohttp
5
6
from aiogremlin.driver import connection
7
8
9
class PooledConnection:
10
    """
11
    Wrapper for :py:class:`Connection<aiogremlin.driver.connection.Connection>`
12
    that helps manage tomfoolery associated with connection pooling.
13
14
    :param aiogremlin.driver.connection.Connection conn:
15
    :param aiogremlin.driver.pool.ConnectionPool pool:
16
    """
17
    def __init__(self, conn, pool):
18
        self._conn = conn
19
        self._pool = pool
20
        self._times_acquired = 0
21
22
    @property
23
    def times_acquired(self):
24
        """
25
        Readonly property.
26
27
        :returns: int
28
        """
29
        return self._times_acquired
30
31
    def increment_acquired(self):
32
        """Increment times acquired attribute by 1"""
33
        self._times_acquired += 1
34
35
    def decrement_acquired(self):
36
        """Decrement times acquired attribute by 1"""
37
        self._times_acquired -= 1
38
39
    async def write(self, message):
40
        """
41
        **coroutine** Submit a script and bindings to the Gremlin Server
42
43
        :param str processor: Gremlin Server processor argument
44
        :param str op: Gremlin Server op argument
45
        :param args: Keyword arguments for Gremlin Server. Depend on processor
46
            and op.
47
48
        :returns: :py:class:`aiohttp.ClientResponse` object
49
        """
50
        return await self._conn.write(message)
51
52
    submit = write
53
54
    async def release_task(self, resp):
55
        await resp.done.wait()
56
        self.release()
57
58
    def release(self):
59
        self._pool.release(self)
60
61
    async def close(self):
62
        """Close underlying connection"""
63
        await self._conn.close()
64
        self._conn = None
65
        self._pool = None
66
67
    @property
68
    def closed(self):
69
        """
70
        Readonly property.
71
72
        :returns: bool
73
        """
74
        return self._conn.closed
75
76
77
class ConnectionPool:
78
    """
79
    A pool of connections to a Gremlin Server host.
80
81
    :param str url: url for host Gremlin Server
82
    :param asyncio.BaseEventLoop loop:
83
    :param ssl.SSLContext ssl_context:
84
    :param str username: Username for database auth
85
    :param str password: Password for database auth
86
    :param float response_timeout: (optional) `None` by default
87
    :param int max_conns: Maximum number of conns to a host
88
    :param int min_connsd: Minimum number of conns to a host
89
    :param int max_times_acquired: Maximum number of times a conn can be
90
        shared by multiple coroutines (clients)
91
    :param int max_inflight: Maximum number of unprocessed requests at any
92
        one time on the connection
93
    """
94
95
    def __init__(self, url, loop, ssl_context, username, password, max_conns,
96
                 min_conns, max_times_acquired, max_inflight, response_timeout,
97
                 message_serializer, provider):
98
        self._url = url
99
        self._loop = loop
100
        self._ssl_context = ssl_context
101
        self._username = username
102
        self._password = password
103
        self._max_conns = max_conns
104
        self._min_conns = min_conns
105
        self._max_times_acquired = max_times_acquired
106
        self._max_inflight = max_inflight
107
        self._response_timeout = response_timeout
108
        self._message_serializer = message_serializer
109
        self._condition = asyncio.Condition(loop=self._loop)
110
        self._available = collections.deque()
111
        self._acquired = collections.deque()
112
        self._provider = provider
113
114
    @property
115
    def url(self):
116
        """
117
        Readonly property.
118
119
        :returns: str
120
        """
121
        return self._url
122
123
    async def init_pool(self):
124
        """**coroutine** Open minumum number of connections to host"""
125
        for i in range(self._min_conns):
126
            conn = await self._get_connection(self._username,
127
                                              self._password,
128
                                              self._max_inflight,
129
                                              self._response_timeout,
130
                                              self._message_serializer,
131
                                              self._provider)
132
            self._available.append(conn)
133
134
    def release(self, conn):
135
        """
136
        Release connection back to pool after use.
137
138
        :param PooledConnection conn:
139
        """
140
        if conn.closed:
141
            self._acquired.remove(conn)
142
        else:
143
            conn.decrement_acquired()
144
            if not conn.times_acquired:
145
                self._acquired.remove(conn)
146
                self._available.append(conn)
147
        self._loop.create_task(self._notify())
148
149
    async def _notify(self):
150
        async with self._condition:
151
            self._condition.notify()
152
153
    async def acquire(self):
154
        """**coroutine** Acquire a new connection from the pool."""
155
        async with self._condition:
156
            while True:
157
                while self._available:
158
                    conn = self._available.popleft()
159
                    if not conn.closed:
160
                        conn.increment_acquired()
161
                        self._acquired.append(conn)
162
                        return conn
163
                if len(self._acquired) < self._max_conns:
164
                    conn = await self._get_connection(self._username, self._password,
165
                                                      self._max_inflight,
166
                                                      self._response_timeout,
167
                                                      self._message_serializer,
168
                                                      self._provider)
169
                    conn.increment_acquired()
170
                    self._acquired.append(conn)
171
                    return conn
172
                else:
173
                    for x in range(len(self._acquired)):
174
                        conn = self._acquired.popleft()
175
                        if conn.times_acquired < self._max_times_acquired:
176
                            conn.increment_acquired()
177
                            self._acquired.append(conn)
178
                            return conn
179
                        self._acquired.append(conn)
180
                    else:
181
                        await self._condition.wait()
182
183
    async def close(self):
184
        """**coroutine** Close connection pool."""
185
        waiters = []
186
        while self._available:
187
            conn = self._available.popleft()
188
            waiters.append(conn.close())
189
        while self._acquired:
190
            conn = self._acquired.popleft()
191
            waiters.append(conn.close())
192
        await asyncio.gather(*waiters, loop=self._loop)
193
194
    async def _get_connection(self, username, password, max_inflight,
195
                              response_timeout, message_serializer, provider):
196
        conn = await connection.Connection.open(
197
            self._url, self._loop, ssl_context=self._ssl_context,
198
            username=username, password=password,
199
            response_timeout=response_timeout,
200
            message_serializer=message_serializer, provider=provider)
201
        conn = PooledConnection(conn, self)
202
        return conn
203