aiogremlin.driver.pool   A
last analyzed

Complexity

Total Complexity 31

Size/Duplication

Total Lines 203
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 31
eloc 118
dl 0
loc 203
rs 9.92
c 0
b 0
f 0

17 Methods

Rating   Name   Duplication   Size   Complexity  
A ConnectionPool._notify() 0 3 2
A PooledConnection.release() 0 2 1
A ConnectionPool.close() 0 10 3
A PooledConnection.times_acquired() 0 8 1
C ConnectionPool.acquire() 0 29 9
A ConnectionPool.url() 0 8 1
A PooledConnection.__init__() 0 4 1
A PooledConnection.decrement_acquired() 0 3 1
A PooledConnection.write() 0 12 1
A PooledConnection.close() 0 5 1
A ConnectionPool.init_pool() 0 10 2
A ConnectionPool._get_connection() 0 9 1
A ConnectionPool.release() 0 14 3
A PooledConnection.increment_acquired() 0 3 1
A PooledConnection.release_task() 0 3 1
A PooledConnection.closed() 0 8 1
A ConnectionPool.__init__() 0 18 1
1
import asyncio
2
import collections
3
4
import aiohttp
5
6
from aiogremlin.driver import connection
7
8
9
class PooledConnection:
10
    """
11
    Wrapper for :py:class:`Connection<aiogremlin.driver.connection.Connection>`
12
    that helps manage tomfoolery associated with connection pooling.
13
14
    :param aiogremlin.driver.connection.Connection conn:
15
    :param aiogremlin.driver.pool.ConnectionPool pool:
16
    """
17
    def __init__(self, conn, pool):
18
        self._conn = conn
19
        self._pool = pool
20
        self._times_acquired = 0
21
22
    @property
23
    def times_acquired(self):
24
        """
25
        Readonly property.
26
27
        :returns: int
28
        """
29
        return self._times_acquired
30
31
    def increment_acquired(self):
32
        """Increment times acquired attribute by 1"""
33
        self._times_acquired += 1
34
35
    def decrement_acquired(self):
36
        """Decrement times acquired attribute by 1"""
37
        self._times_acquired -= 1
38
39
    async def write(self, message):
40
        """
41
        **coroutine** Submit a script and bindings to the Gremlin Server
42
43
        :param str processor: Gremlin Server processor argument
44
        :param str op: Gremlin Server op argument
45
        :param args: Keyword arguments for Gremlin Server. Depend on processor
46
            and op.
47
48
        :returns: :py:class:`aiohttp.ClientResponse` object
49
        """
50
        return await self._conn.write(message)
51
52
    submit = write
53
54
    async def release_task(self, resp):
55
        await resp.done.wait()
56
        self.release()
57
58
    def release(self):
59
        self._pool.release(self)
60
61
    async def close(self):
62
        """Close underlying connection"""
63
        await self._conn.close()
64
        self._conn = None
65
        self._pool = None
66
67
    @property
68
    def closed(self):
69
        """
70
        Readonly property.
71
72
        :returns: bool
73
        """
74
        return self._conn.closed
75
76
77
class ConnectionPool:
78
    """
79
    A pool of connections to a Gremlin Server host.
80
81
    :param str url: url for host Gremlin Server
82
    :param asyncio.BaseEventLoop loop:
83
    :param ssl.SSLContext ssl_context:
84
    :param str username: Username for database auth
85
    :param str password: Password for database auth
86
    :param float response_timeout: (optional) `None` by default
87
    :param int max_conns: Maximum number of conns to a host
88
    :param int min_connsd: Minimum number of conns to a host
89
    :param int max_times_acquired: Maximum number of times a conn can be
90
        shared by multiple coroutines (clients)
91
    :param int max_inflight: Maximum number of unprocessed requests at any
92
        one time on the connection
93
    """
94
95
    def __init__(self, url, loop, ssl_context, username, password, max_conns,
96
                 min_conns, max_times_acquired, max_inflight, response_timeout,
97
                 message_serializer, provider):
98
        self._url = url
99
        self._loop = loop
100
        self._ssl_context = ssl_context
101
        self._username = username
102
        self._password = password
103
        self._max_conns = max_conns
104
        self._min_conns = min_conns
105
        self._max_times_acquired = max_times_acquired
106
        self._max_inflight = max_inflight
107
        self._response_timeout = response_timeout
108
        self._message_serializer = message_serializer
109
        self._condition = asyncio.Condition(loop=self._loop)
110
        self._available = collections.deque()
111
        self._acquired = collections.deque()
112
        self._provider = provider
113
114
    @property
115
    def url(self):
116
        """
117
        Readonly property.
118
119
        :returns: str
120
        """
121
        return self._url
122
123
    async def init_pool(self):
124
        """**coroutine** Open minumum number of connections to host"""
125
        for i in range(self._min_conns):
126
            conn = await self._get_connection(self._username,
127
                                              self._password,
128
                                              self._max_inflight,
129
                                              self._response_timeout,
130
                                              self._message_serializer,
131
                                              self._provider)
132
            self._available.append(conn)
133
134
    def release(self, conn):
135
        """
136
        Release connection back to pool after use.
137
138
        :param PooledConnection conn:
139
        """
140
        if conn.closed:
141
            self._acquired.remove(conn)
142
        else:
143
            conn.decrement_acquired()
144
            if not conn.times_acquired:
145
                self._acquired.remove(conn)
146
                self._available.append(conn)
147
        self._loop.create_task(self._notify())
148
149
    async def _notify(self):
150
        async with self._condition:
151
            self._condition.notify()
152
153
    async def acquire(self):
154
        """**coroutine** Acquire a new connection from the pool."""
155
        async with self._condition:
156
            while True:
157
                while self._available:
158
                    conn = self._available.popleft()
159
                    if not conn.closed:
160
                        conn.increment_acquired()
161
                        self._acquired.append(conn)
162
                        return conn
163
                if len(self._acquired) < self._max_conns:
164
                    conn = await self._get_connection(self._username, self._password,
165
                                                      self._max_inflight,
166
                                                      self._response_timeout,
167
                                                      self._message_serializer,
168
                                                      self._provider)
169
                    conn.increment_acquired()
170
                    self._acquired.append(conn)
171
                    return conn
172
                else:
173
                    for x in range(len(self._acquired)):
174
                        conn = self._acquired.popleft()
175
                        if conn.times_acquired < self._max_times_acquired:
176
                            conn.increment_acquired()
177
                            self._acquired.append(conn)
178
                            return conn
179
                        self._acquired.append(conn)
180
                    else:
181
                        await self._condition.wait()
182
183
    async def close(self):
184
        """**coroutine** Close connection pool."""
185
        waiters = []
186
        while self._available:
187
            conn = self._available.popleft()
188
            waiters.append(conn.close())
189
        while self._acquired:
190
            conn = self._acquired.popleft()
191
            waiters.append(conn.close())
192
        await asyncio.gather(*waiters, loop=self._loop)
193
194
    async def _get_connection(self, username, password, max_inflight,
195
                              response_timeout, message_serializer, provider):
196
        conn = await connection.Connection.open(
197
            self._url, self._loop, ssl_context=self._ssl_context,
198
            username=username, password=password,
199
            response_timeout=response_timeout,
200
            message_serializer=message_serializer, provider=provider)
201
        conn = PooledConnection(conn, self)
202
        return conn
203