Cluster.config_from_json()   A
last analyzed

Complexity

Conditions 2

Size

Total Lines 10
Code Lines 5

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 5
dl 0
loc 10
rs 10
c 0
b 0
f 0
cc 2
nop 2
1
import asyncio
2
import collections
3
import configparser
4
import importlib
5
6
try:
7
    import ujson as json
8
except ImportError:
9
    import json
10
11
import yaml
12
13
from aiogremlin import exception
14
from aiogremlin import driver
15
from gremlin_python.driver import serializer
16
17
18
def my_import(name):
19
    names = name.rsplit('.', maxsplit=1)
20
    if len(names) != 2:
21
        raise exception.ConfigError("not a valid absolute python path to a class: {}".format(name))
22
    module_name, class_name = names
23
    try:
24
        module = importlib.import_module(module_name)
25
    except ImportError:
26
        raise exception.ConfigError(
27
                "Error processing cluster configuration: could not import {}".format(name))
28
    return getattr(module, class_name)
29
30
31
class Cluster:
32
    """
33
    A cluster of Gremlin Server hosts. This object provides the main high
34
    level interface used by the :py:mod:`aiogremlin` module.
35
36
    :param asyncio.BaseEventLoop loop:
37
    :param dict aliases: Optional mapping for aliases. Default is `None`
38
    :param config: Optional cluster configuration passed as kwargs or `dict`
39
    """
40
41
    DEFAULT_CONFIG = {
42
        'scheme': 'ws',
43
        'hosts': ['localhost'],
44
        'port': 8182,
45
        'ssl_certfile': '',
46
        'ssl_keyfile': '',
47
        'ssl_password': '',
48
        'username': '',
49
        'password': '',
50
        'response_timeout': None,
51
        'max_conns': 4,
52
        'min_conns': 1,
53
        'max_times_acquired': 16,
54
        'max_inflight': 64,
55
        'message_serializer': 'gremlin_python.driver.serializer.GraphSONMessageSerializer',
56
        'provider': 'aiogremlin.driver.provider.TinkerGraph'
57
    }
58
59
    def __init__(self, loop, aliases=None, **config):
60
        self._loop = loop
61
        default_config = dict(self.DEFAULT_CONFIG)
62
        default_config.update(config)
63
        self._config = self._process_config_imports(default_config)
64
        self._hosts = collections.deque()
65
        self._hostmap = {}
66
        self._closed = False
67
        if aliases is None:
68
            aliases = {}
69
        self._aliases = aliases
70
71
    @classmethod
72
    async def open(cls, loop, *, aliases=None, configfile=None, **config):
73
        """
74
        **coroutine** Open a cluster, connecting to all available hosts as
75
        specified in configuration.
76
77
        :param asyncio.BaseEventLoop loop:
78
        :param dict aliases: Optional mapping for aliases. Default is `None`
79
        :param str configfile: Optional configuration file in .json or
80
            .yml format
81
        :param config: Optional cluster configuration passed as kwargs or `dict`
82
        """
83
        cluster = cls(loop, aliases=aliases, **config)
84
        if configfile:
85
            cluster.config_from_file(configfile)
86
        await cluster.establish_hosts()
87
        return cluster
88
89
    @property
90
    def hosts(self):
91
        """Read-only property"""
92
        return self._hosts
93
94
    @property
95
    def config(self):
96
        """
97
        Read-only property.
98
99
        :returns: `dict` containing the cluster configuration
100
        """
101
        return self._config
102
103
    async def get_connection(self, hostname=None):
104
        """
105
        **coroutine** Get connection from next available host in a round robin
106
        fashion.
107
108
        :returns: :py:class:`Connection<aiogremlin.driver.connection.Connection>`
109
        """
110
        if not self._hosts:
111
            await self.establish_hosts()
112
        if hostname:
113
            try:
114
                host = self._hostmap[hostname]
115
            except KeyError:
116
                raise exception.ConfigError(
117
                    'Unknown host: {}'.format(hostname))
118
        else:
119
            host = self._hosts.popleft()
120
        conn = await host.get_connection()
121
        self._hosts.append(host)
122
        return conn
123
124
    async def establish_hosts(self):
125
        """
126
        **coroutine** Connect to all hosts as specified in configuration.
127
        """
128
        scheme = self._config['scheme']
129
        hosts = self._config['hosts']
130
        port = self._config['port']
131
        for hostname in hosts:
132
            url = '{}://{}:{}/gremlin'.format(scheme, hostname, port)
133
            host = await driver.GremlinServer.open(
134
                url, self._loop, **dict(self._config))
135
            self._hosts.append(host)
136
            self._hostmap[hostname] = host
137
138
    def config_from_file(self, filename):
139
        """
140
        Load configuration from from file.
141
142
        :param str filename: Path to the configuration file.
143
        """
144
        if filename.endswith('yml') or filename.endswith('yaml'):
145
            self.config_from_yaml(filename)
146
        elif filename.endswith('.json'):
147
            self.config_from_json(filename)
148
        else:
149
            raise exception.ConfigurationError('Unknown config file format')
150
151
    def config_from_yaml(self, filename):
152
        """
153
        Load configuration from from YAML file.
154
155
        :param str filename: Path to the configuration file.
156
        """
157
        with open(filename, 'r') as f:
158
            config = yaml.load(f)
159
        config = self._process_config_imports(config)
160
        self._config.update(config)
161
162
    def config_from_json(self, filename):
163
        """
164
        Load configuration from from JSON file.
165
166
        :param str filename: Path to the configuration file.
167
        """
168
        with open(filename, 'r') as f:
169
            config = json.load(f)
170
        config = self._process_config_imports(config)
171
        self.config.update(config)
172
173
    def _process_config_imports(self, config):
174
        message_serializer = config.get('message_serializer')
175
        provider = config.get('provider')
176
        if isinstance(message_serializer, str):
177
            config['message_serializer'] = my_import(message_serializer)
178
        if isinstance(provider, str):
179
            config['provider'] = my_import(provider)
180
        return config
181
182
    def config_from_module(self, module):
183
        """
184
        Load configuration from Python module.
185
186
        :param str filename: Path to the configuration file.
187
        """
188
        if isinstance(module, str):
189
            module = importlib.import_module(module)
190
        config = dict()
191
        for item in dir(module):
192
            if not item.startswith('_') and item.lower() in self.DEFAULT_CONFIG:
193
                config[item.lower()] = getattr(module, item)
194
        config = self._process_config_imports(config)
195
        self.config.update(config)
196
197
    async def connect(self, hostname=None, aliases=None):
198
        """
199
        **coroutine** Get a connected client. Main API method.
200
201
        :returns: A connected instance of
202
            `Client<aiogremlin.driver.client.Client>`
203
        """
204
        aliases = aliases or self._aliases
205
        if not self._hosts:
206
            await self.establish_hosts()
207
        # if session:
208
        #     host = self._hosts.popleft()
209
        #     client = client.SessionedClient(host, self._loop, session,
210
        #                                     aliases=aliases)
211
        #     self._hosts.append(host)
212
        # else:
213
        client = driver.Client(self, self._loop, hostname=hostname,
214
                               aliases=aliases)
215
        return client
216
217
    async def close(self):
218
        """**coroutine** Close cluster and all connected hosts."""
219
        waiters = []
220
        while self._hosts:
221
            host = self._hosts.popleft()
222
            waiters.append(host.close())
223
        await asyncio.gather(*waiters, loop=self._loop)
224
        self._closed = True
225