aiogremlin.driver.resultset   A
last analyzed

Complexity

Total Complexity 21

Size/Duplication

Total Lines 97
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 21
eloc 73
dl 0
loc 97
rs 10
c 0
b 0
f 0

11 Methods

Rating   Name   Duplication   Size   Complexity  
A ResultSet.__anext__() 0 5 2
A ResultSet.aggregate_to() 0 3 1
A ResultSet.__aiter__() 0 2 1
A ResultSet.done() 0 8 1
A ResultSet.one() 0 16 4
A ResultSet.close() 0 3 1
A ResultSet.stream() 0 3 1
A ResultSet.all() 0 5 2
A ResultSet.queue_result() 0 4 2
A ResultSet.__init__() 0 7 1
A ResultSet.request_id() 0 3 1

1 Function

Rating   Name   Duplication   Size   Complexity  
A error_handler() 0 13 3
1
import asyncio
2
import functools
3
4
from aiogremlin import exception
5
6
7
def error_handler(fn):
8
    @functools.wraps(fn)
9
    async def wrapper(self):
10
        msg = await fn(self)
11
        if msg:
12
            if msg.status_code not in [200, 206]:
13
                self.close()
14
                raise exception.GremlinServerError(
15
                    msg.status_code,
16
                    "{0}: {1}".format(msg.status_code, msg.message))
17
            msg = msg.data
18
        return msg
19
    return wrapper
20
21
22
class ResultSet:
23
    """Gremlin Server response implementated as an async iterator."""
24
    def __init__(self, request_id, timeout, loop):
25
        self._response_queue = asyncio.Queue(loop=loop)
26
        self._request_id = request_id
27
        self._loop = loop
28
        self._timeout = timeout
29
        self._done = asyncio.Event(loop=self._loop)
30
        self._aggregate_to = None
31
32
    @property
33
    def request_id(self):
34
        return self._request_id
35
36
    @property
37
    def stream(self):
38
        return self._response_queue
39
40
    def queue_result(self, result):
41
        if result is None:
42
            self.close()
43
        self._response_queue.put_nowait(result)
44
45
    @property
46
    def done(self):
47
        """
48
        Readonly property.
49
50
        :returns: `asyncio.Event` object
51
        """
52
        return self._done
53
54
    @property
55
    def aggregate_to(self):
56
        return self._aggregate_to
57
58
    @aggregate_to.setter
59
    def aggregate_to(self, val):
60
        self._aggregate_to = val
61
62
    def __aiter__(self):
63
        return self
64
65
    async def __anext__(self):
66
        msg = await self.one()
67
        if not msg:
68
            raise StopAsyncIteration
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable StopAsyncIteration does not seem to be defined.
Loading history...
69
        return msg
70
71
    def close(self):
72
        self.done.set()
73
        self._loop = None
74
75
    @error_handler
76
    async def one(self):
77
        """Get a single message from the response stream"""
78
        if not self._response_queue.empty():
79
            msg = self._response_queue.get_nowait()
80
        elif self.done.is_set():
81
            msg = None
82
        else:
83
            try:
84
                msg = await asyncio.wait_for(self._response_queue.get(),
85
                                             timeout=self._timeout,
86
                                             loop=self._loop)
87
            except asyncio.TimeoutError:
88
                self.close()
89
                raise exception.ResponseTimeoutError('Response timed out')
90
        return msg
91
92
    async def all(self):
93
        results = []
94
        async for result in self:
95
            results.append(result)
96
        return results
97