1
|
|
|
import asyncio |
2
|
|
|
import collections |
3
|
|
|
import configparser |
4
|
|
|
import importlib |
5
|
|
|
|
6
|
|
|
try: |
7
|
|
|
import ujson as json |
8
|
|
|
except ImportError: |
9
|
|
|
import json |
10
|
|
|
|
11
|
|
|
import yaml |
12
|
|
|
|
13
|
|
|
from aiogremlin import exception |
14
|
|
|
from aiogremlin import driver |
15
|
|
|
from gremlin_python.driver import serializer |
16
|
|
|
|
17
|
|
|
|
18
|
|
|
def my_import(name): |
19
|
|
|
names = name.rsplit('.', maxsplit=1) |
20
|
|
|
if len(names) != 2: |
21
|
|
|
raise exception.ConfigError("not a valid absolute python path to a class: {}".format(name)) |
22
|
|
|
module_name, class_name = names |
23
|
|
|
try: |
24
|
|
|
module = importlib.import_module(module_name) |
25
|
|
|
except ImportError: |
26
|
|
|
raise exception.ConfigError( |
27
|
|
|
"Error processing cluster configuration: could not import {}".format(name)) |
28
|
|
|
return getattr(module, class_name) |
29
|
|
|
|
30
|
|
|
|
31
|
|
|
class Cluster: |
32
|
|
|
""" |
33
|
|
|
A cluster of Gremlin Server hosts. This object provides the main high |
34
|
|
|
level interface used by the :py:mod:`aiogremlin` module. |
35
|
|
|
|
36
|
|
|
:param asyncio.BaseEventLoop loop: |
37
|
|
|
:param dict aliases: Optional mapping for aliases. Default is `None` |
38
|
|
|
:param config: Optional cluster configuration passed as kwargs or `dict` |
39
|
|
|
""" |
40
|
|
|
|
41
|
|
|
DEFAULT_CONFIG = { |
42
|
|
|
'scheme': 'ws', |
43
|
|
|
'hosts': ['localhost'], |
44
|
|
|
'port': 8182, |
45
|
|
|
'ssl_certfile': '', |
46
|
|
|
'ssl_keyfile': '', |
47
|
|
|
'ssl_password': '', |
48
|
|
|
'username': '', |
49
|
|
|
'password': '', |
50
|
|
|
'response_timeout': None, |
51
|
|
|
'max_conns': 4, |
52
|
|
|
'min_conns': 1, |
53
|
|
|
'max_times_acquired': 16, |
54
|
|
|
'max_inflight': 64, |
55
|
|
|
'message_serializer': 'gremlin_python.driver.serializer.GraphSONMessageSerializer', |
56
|
|
|
'provider': 'aiogremlin.driver.provider.TinkerGraph' |
57
|
|
|
} |
58
|
|
|
|
59
|
|
|
def __init__(self, loop, aliases=None, **config): |
60
|
|
|
self._loop = loop |
61
|
|
|
default_config = dict(self.DEFAULT_CONFIG) |
62
|
|
|
default_config.update(config) |
63
|
|
|
self._config = self._process_config_imports(default_config) |
64
|
|
|
self._hosts = collections.deque() |
65
|
|
|
self._hostmap = {} |
66
|
|
|
self._closed = False |
67
|
|
|
if aliases is None: |
68
|
|
|
aliases = {} |
69
|
|
|
self._aliases = aliases |
70
|
|
|
|
71
|
|
|
@classmethod |
72
|
|
|
async def open(cls, loop, *, aliases=None, configfile=None, **config): |
73
|
|
|
""" |
74
|
|
|
**coroutine** Open a cluster, connecting to all available hosts as |
75
|
|
|
specified in configuration. |
76
|
|
|
|
77
|
|
|
:param asyncio.BaseEventLoop loop: |
78
|
|
|
:param dict aliases: Optional mapping for aliases. Default is `None` |
79
|
|
|
:param str configfile: Optional configuration file in .json or |
80
|
|
|
.yml format |
81
|
|
|
:param config: Optional cluster configuration passed as kwargs or `dict` |
82
|
|
|
""" |
83
|
|
|
cluster = cls(loop, aliases=aliases, **config) |
84
|
|
|
if configfile: |
85
|
|
|
cluster.config_from_file(configfile) |
86
|
|
|
await cluster.establish_hosts() |
87
|
|
|
return cluster |
88
|
|
|
|
89
|
|
|
@property |
90
|
|
|
def hosts(self): |
91
|
|
|
"""Read-only property""" |
92
|
|
|
return self._hosts |
93
|
|
|
|
94
|
|
|
@property |
95
|
|
|
def config(self): |
96
|
|
|
""" |
97
|
|
|
Read-only property. |
98
|
|
|
|
99
|
|
|
:returns: `dict` containing the cluster configuration |
100
|
|
|
""" |
101
|
|
|
return self._config |
102
|
|
|
|
103
|
|
|
async def get_connection(self, hostname=None): |
104
|
|
|
""" |
105
|
|
|
**coroutine** Get connection from next available host in a round robin |
106
|
|
|
fashion. |
107
|
|
|
|
108
|
|
|
:returns: :py:class:`Connection<aiogremlin.driver.connection.Connection>` |
109
|
|
|
""" |
110
|
|
|
if not self._hosts: |
111
|
|
|
await self.establish_hosts() |
112
|
|
|
if hostname: |
113
|
|
|
try: |
114
|
|
|
host = self._hostmap[hostname] |
115
|
|
|
except KeyError: |
116
|
|
|
raise exception.ConfigError( |
117
|
|
|
'Unknown host: {}'.format(hostname)) |
118
|
|
|
else: |
119
|
|
|
host = self._hosts.popleft() |
120
|
|
|
conn = await host.get_connection() |
121
|
|
|
self._hosts.append(host) |
122
|
|
|
return conn |
123
|
|
|
|
124
|
|
|
async def establish_hosts(self): |
125
|
|
|
""" |
126
|
|
|
**coroutine** Connect to all hosts as specified in configuration. |
127
|
|
|
""" |
128
|
|
|
scheme = self._config['scheme'] |
129
|
|
|
hosts = self._config['hosts'] |
130
|
|
|
port = self._config['port'] |
131
|
|
|
for hostname in hosts: |
132
|
|
|
url = '{}://{}:{}/gremlin'.format(scheme, hostname, port) |
133
|
|
|
host = await driver.GremlinServer.open( |
134
|
|
|
url, self._loop, **dict(self._config)) |
135
|
|
|
self._hosts.append(host) |
136
|
|
|
self._hostmap[hostname] = host |
137
|
|
|
|
138
|
|
|
def config_from_file(self, filename): |
139
|
|
|
""" |
140
|
|
|
Load configuration from from file. |
141
|
|
|
|
142
|
|
|
:param str filename: Path to the configuration file. |
143
|
|
|
""" |
144
|
|
|
if filename.endswith('yml') or filename.endswith('yaml'): |
145
|
|
|
self.config_from_yaml(filename) |
146
|
|
|
elif filename.endswith('.json'): |
147
|
|
|
self.config_from_json(filename) |
148
|
|
|
else: |
149
|
|
|
raise exception.ConfigurationError('Unknown config file format') |
150
|
|
|
|
151
|
|
|
def config_from_yaml(self, filename): |
152
|
|
|
""" |
153
|
|
|
Load configuration from from YAML file. |
154
|
|
|
|
155
|
|
|
:param str filename: Path to the configuration file. |
156
|
|
|
""" |
157
|
|
|
with open(filename, 'r') as f: |
158
|
|
|
config = yaml.load(f) |
159
|
|
|
config = self._process_config_imports(config) |
160
|
|
|
self._config.update(config) |
161
|
|
|
|
162
|
|
|
def config_from_json(self, filename): |
163
|
|
|
""" |
164
|
|
|
Load configuration from from JSON file. |
165
|
|
|
|
166
|
|
|
:param str filename: Path to the configuration file. |
167
|
|
|
""" |
168
|
|
|
with open(filename, 'r') as f: |
169
|
|
|
config = json.load(f) |
170
|
|
|
config = self._process_config_imports(config) |
171
|
|
|
self.config.update(config) |
172
|
|
|
|
173
|
|
|
def _process_config_imports(self, config): |
174
|
|
|
message_serializer = config.get('message_serializer') |
175
|
|
|
provider = config.get('provider') |
176
|
|
|
if isinstance(message_serializer, str): |
177
|
|
|
config['message_serializer'] = my_import(message_serializer) |
178
|
|
|
if isinstance(provider, str): |
179
|
|
|
config['provider'] = my_import(provider) |
180
|
|
|
return config |
181
|
|
|
|
182
|
|
|
def config_from_module(self, module): |
183
|
|
|
""" |
184
|
|
|
Load configuration from Python module. |
185
|
|
|
|
186
|
|
|
:param str filename: Path to the configuration file. |
187
|
|
|
""" |
188
|
|
|
if isinstance(module, str): |
189
|
|
|
module = importlib.import_module(module) |
190
|
|
|
config = dict() |
191
|
|
|
for item in dir(module): |
192
|
|
|
if not item.startswith('_') and item.lower() in self.DEFAULT_CONFIG: |
193
|
|
|
config[item.lower()] = getattr(module, item) |
194
|
|
|
config = self._process_config_imports(config) |
195
|
|
|
self.config.update(config) |
196
|
|
|
|
197
|
|
|
async def connect(self, hostname=None, aliases=None): |
198
|
|
|
""" |
199
|
|
|
**coroutine** Get a connected client. Main API method. |
200
|
|
|
|
201
|
|
|
:returns: A connected instance of |
202
|
|
|
`Client<aiogremlin.driver.client.Client>` |
203
|
|
|
""" |
204
|
|
|
aliases = aliases or self._aliases |
205
|
|
|
if not self._hosts: |
206
|
|
|
await self.establish_hosts() |
207
|
|
|
# if session: |
208
|
|
|
# host = self._hosts.popleft() |
209
|
|
|
# client = client.SessionedClient(host, self._loop, session, |
210
|
|
|
# aliases=aliases) |
211
|
|
|
# self._hosts.append(host) |
212
|
|
|
# else: |
213
|
|
|
client = driver.Client(self, self._loop, hostname=hostname, |
214
|
|
|
aliases=aliases) |
215
|
|
|
return client |
216
|
|
|
|
217
|
|
|
async def close(self): |
218
|
|
|
"""**coroutine** Close cluster and all connected hosts.""" |
219
|
|
|
waiters = [] |
220
|
|
|
while self._hosts: |
221
|
|
|
host = self._hosts.popleft() |
222
|
|
|
waiters.append(host.close()) |
223
|
|
|
await asyncio.gather(*waiters, loop=self._loop) |
224
|
|
|
self._closed = True |
225
|
|
|
|