aiogremlin.process.graph_traversal   A
last analyzed

Complexity

Total Complexity 23

Size/Duplication

Total Lines 94
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 23
eloc 69
dl 0
loc 94
rs 10
c 0
b 0
f 0

10 Methods

Rating   Name   Duplication   Size   Complexity  
A AsyncGraphTraversal.toSet() 0 6 2
A AsyncGraphTraversal.iterate() 0 7 3
A AsyncGraphTraversalSource.__init__() 0 3 1
A AsyncGraphTraversal.nextTraverser() 0 10 3
A AsyncGraphTraversal.__anext__() 0 10 4
A AsyncGraphTraversal.next() 0 15 5
A AsyncGraphTraversalSource.get_graph_traversal_source() 0 4 1
A AsyncGraphTraversal.toList() 0 6 2
A AsyncGraphTraversalSource.withRemote() 0 4 1
A AsyncGraphTraversal.__aiter__() 0 2 1
1
from aiogremlin.process.traversal import AsyncTraversalStrategies
2
from aiogremlin.remote.remote_connection import AsyncRemoteStrategy
3
4
from gremlin_python.process import graph_traversal, traversal
5
6
7
class AsyncGraphTraversal(graph_traversal.GraphTraversal):
8
    """Implements async iteration protocol and updates relevant methods"""
9
10
    def __aiter__(self):
11
        return self
12
13
    async def __anext__(self):
14
        if self.traversers is None:
15
            await self.traversal_strategies.apply_strategies(self)
16
        if self.last_traverser is None:
17
            self.last_traverser = await self.traversers.__anext__()
18
        object = self.last_traverser.object
19
        self.last_traverser.bulk = self.last_traverser.bulk - 1
20
        if self.last_traverser.bulk <= 0:
21
            self.last_traverser = None
22
        return object
23
24
    async def toList(self):
25
        """Reture results as ``list``."""
26
        results = []
27
        async for result in self:
28
            results.append(result)
29
        return results
30
31
    async def toSet(self):
32
        """Return results as ``set``."""
33
        results = set()
34
        async for result in self:
35
            results.add(result)
36
        return results
37
38
    async def iterate(self):
39
        """Iterate over results."""
40
        while True:
41
            try:
42
                await self.nextTraverser()
43
            except StopAsyncIteration:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable StopAsyncIteration does not seem to be defined.
Loading history...
44
                return self
45
46
    async def nextTraverser(self):
47
        """Return next traverser."""
48
        if self.traversers is None:
49
            await self.traversal_strategies.apply_strategies(self)
50
        if self.last_traverser is None:
51
            return await self.traversers.__anext__()
52
        else:
53
            temp = self.last_traverser
54
            self.last_traverser = None
55
            return temp
56
57
    async def next(self, amount=None):
58
        """Return iterator with optionaly defined amount of items."""
59
        if not amount:
60
            try:
61
                return await self.__anext__()
62
            except StopAsyncIteration:
0 ignored issues
show
Comprehensibility Best Practice introduced by
The variable StopAsyncIteration does not seem to be defined.
Loading history...
63
                return
64
        results = []
65
        for i in range(amount):
66
            try:
67
                result = await self.__anext__()
68
            except StopAsyncIteration:
69
                return results
70
            results.append(result)
71
        return results
72
73
74
class __(graph_traversal.__):
75
76
    graph_traversal = AsyncGraphTraversal
77
78
79
class AsyncGraphTraversalSource(graph_traversal.GraphTraversalSource):
80
81
    def __init__(self, *args, **kwargs):
82
        super().__init__(*args, **kwargs)
83
        self.graph_traversal = AsyncGraphTraversal
84
85
    def withRemote(self, remote_connection):
86
        source = self.get_graph_traversal_source()
87
        source.traversal_strategies.add_strategies([AsyncRemoteStrategy(remote_connection)])
88
        return source
89
90
    def get_graph_traversal_source(self):
91
        return self.__class__(
92
            self.graph, AsyncTraversalStrategies(self.traversal_strategies),
93
            traversal.Bytecode(self.bytecode))
94