aiogremlin.driver.resultset.ResultSet.one()   A
last analyzed

Complexity

Conditions 4

Size

Total Lines 16
Code Lines 14

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 14
dl 0
loc 16
rs 9.7
c 0
b 0
f 0
cc 4
nop 1
1
import asyncio
2
import functools
3
4
from aiogremlin import exception
5
6
7
def error_handler(fn):
8
    @functools.wraps(fn)
9
    async def wrapper(self):
10
        msg = await fn(self)
11
        if msg:
12
            if msg.status_code not in [200, 206]:
13
                self.close()
14
                raise exception.GremlinServerError(
15
                    msg.status_code,
16
                    "{0}: {1}".format(msg.status_code, msg.message))
17
            msg = msg.data
18
        return msg
19
    return wrapper
20
21
22
class ResultSet:
23
    """Gremlin Server response implementated as an async iterator."""
24
    def __init__(self, request_id, timeout, loop):
25
        self._response_queue = asyncio.Queue(loop=loop)
26
        self._request_id = request_id
27
        self._loop = loop
28
        self._timeout = timeout
29
        self._done = asyncio.Event(loop=self._loop)
30
        self._aggregate_to = None
31
32
    @property
33
    def request_id(self):
34
        return self._request_id
35
36
    @property
37
    def stream(self):
38
        return self._response_queue
39
40
    def queue_result(self, result):
41
        if result is None:
42
            self.close()
43
        self._response_queue.put_nowait(result)
44
45
    @property
46
    def done(self):
47
        """
48
        Readonly property.
49
50
        :returns: `asyncio.Event` object
51
        """
52
        return self._done
53
54
    @property
55
    def aggregate_to(self):
56
        return self._aggregate_to
57
58
    @aggregate_to.setter
59
    def aggregate_to(self, val):
60
        self._aggregate_to = val
61
62
    def __aiter__(self):
63
        return self
64
65
    async def __anext__(self):
66
        msg = await self.one()
67
        if not msg:
68
            raise StopAsyncIteration
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable StopAsyncIteration does not seem to be defined.
Loading history...
69
        return msg
70
71
    def close(self):
72
        self.done.set()
73
        self._loop = None
74
75
    @error_handler
76
    async def one(self):
77
        """Get a single message from the response stream"""
78
        if not self._response_queue.empty():
79
            msg = self._response_queue.get_nowait()
80
        elif self.done.is_set():
81
            msg = None
82
        else:
83
            try:
84
                msg = await asyncio.wait_for(self._response_queue.get(),
85
                                             timeout=self._timeout,
86
                                             loop=self._loop)
87
            except asyncio.TimeoutError:
88
                self.close()
89
                raise exception.ResponseTimeoutError('Response timed out')
90
        return msg
91
92
    async def all(self):
93
        results = []
94
        async for result in self:
95
            results.append(result)
96
        return results
97