AsyncGraphTraversalSource.__init__()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 3
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 3
dl 0
loc 3
rs 10
c 0
b 0
f 0
cc 1
nop 3
1
from aiogremlin.process.traversal import AsyncTraversalStrategies
2
from aiogremlin.remote.remote_connection import AsyncRemoteStrategy
3
4
from gremlin_python.process import graph_traversal, traversal
5
6
7
class AsyncGraphTraversal(graph_traversal.GraphTraversal):
8
    """Implements async iteration protocol and updates relevant methods"""
9
10
    def __aiter__(self):
11
        return self
12
13
    async def __anext__(self):
14
        if self.traversers is None:
15
            await self.traversal_strategies.apply_strategies(self)
16
        if self.last_traverser is None:
17
            self.last_traverser = await self.traversers.__anext__()
18
        object = self.last_traverser.object
19
        self.last_traverser.bulk = self.last_traverser.bulk - 1
20
        if self.last_traverser.bulk <= 0:
21
            self.last_traverser = None
22
        return object
23
24
    async def toList(self):
25
        """Reture results as ``list``."""
26
        results = []
27
        async for result in self:
28
            results.append(result)
29
        return results
30
31
    async def toSet(self):
32
        """Return results as ``set``."""
33
        results = set()
34
        async for result in self:
35
            results.add(result)
36
        return results
37
38
    async def iterate(self):
39
        """Iterate over results."""
40
        while True:
41
            try:
42
                await self.nextTraverser()
43
            except StopAsyncIteration:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable StopAsyncIteration does not seem to be defined.
Loading history...
44
                return self
45
46
    async def nextTraverser(self):
47
        """Return next traverser."""
48
        if self.traversers is None:
49
            await self.traversal_strategies.apply_strategies(self)
50
        if self.last_traverser is None:
51
            return await self.traversers.__anext__()
52
        else:
53
            temp = self.last_traverser
54
            self.last_traverser = None
55
            return temp
56
57
    async def next(self, amount=None):
58
        """Return iterator with optionaly defined amount of items."""
59
        if not amount:
60
            try:
61
                return await self.__anext__()
62
            except StopAsyncIteration:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable StopAsyncIteration does not seem to be defined.
Loading history...
63
                return
64
        results = []
65
        for i in range(amount):
66
            try:
67
                result = await self.__anext__()
68
            except StopAsyncIteration:
69
                return results
70
            results.append(result)
71
        return results
72
73
74
class __(graph_traversal.__):
75
76
    graph_traversal = AsyncGraphTraversal
77
78
79
class AsyncGraphTraversalSource(graph_traversal.GraphTraversalSource):
80
81
    def __init__(self, *args, **kwargs):
82
        super().__init__(*args, **kwargs)
83
        self.graph_traversal = AsyncGraphTraversal
84
85
    def withRemote(self, remote_connection):
86
        source = self.get_graph_traversal_source()
87
        source.traversal_strategies.add_strategies([AsyncRemoteStrategy(remote_connection)])
88
        return source
89
90
    def get_graph_traversal_source(self):
91
        return self.__class__(
92
            self.graph, AsyncTraversalStrategies(self.traversal_strategies),
93
            traversal.Bytecode(self.bytecode))
94