aiogremlin.remote.driver_remote_side_effects   A
last analyzed

Complexity

Total Complexity 19

Size/Duplication

Total Lines 88
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 19
eloc 69
dl 0
loc 88
rs 10
c 0
b 0
f 0

7 Methods

Rating   Name   Duplication   Size   Complexity  
A AsyncRemoteTraversalSideEffects._get() 0 7 1
A AsyncRemoteTraversalSideEffects.close() 0 10 2
A AsyncRemoteTraversalSideEffects.__getitem__() 0 5 2
A AsyncRemoteTraversalSideEffects.keys() 0 11 2
A AsyncRemoteTraversalSideEffects.__init__() 0 6 1
A AsyncRemoteTraversalSideEffects.get() 0 10 3
B AsyncRemoteTraversalSideEffects._aggregate_results() 0 26 8
1
from gremlin_python.driver import request
2
from gremlin_python.process import traversal
3
4
5
6
class AsyncRemoteTraversalSideEffects(traversal.TraversalSideEffects):
7
    def __init__(self, side_effect, client):
8
        self._side_effect = side_effect
9
        self._client = client
10
        self._keys = set()
11
        self._side_effects = {}
12
        self._closed = False
13
14
    async def __getitem__(self, key):
15
        if isinstance(key, slice):
16
            raise TypeError(
17
                'AsyncRemoteTraversalSideEffects does not support slicing')
18
        return await self.get(key)
19
20
    async def keys(self):
21
        """Get side effect keys associated with Traversal"""
22
        if not self._closed:
23
            message = request.RequestMessage(
24
                'traversal', 'keys',
25
                {'sideEffect': self._side_effect,
26
                'aliases': self._client.aliases})
27
            result_set = await self._client.submit(message)
28
            results = await result_set.all()
29
            self._keys = set(results)
30
        return self._keys
31
32
    async def get(self, key):
33
        """Get side effects associated with a specific key"""
34
        if not self._side_effects.get(key):
35
            if not self._closed:
36
                results = await self._get(key)
37
                self._side_effects[key] = results
38
                self._keys.add(key)
39
            else:
40
                return None
41
        return self._side_effects[key]
42
43
    async def _get(self, key):
44
        message = request.RequestMessage(
45
            'traversal', 'gather',
46
            {'sideEffect': self._side_effect, 'sideEffectKey': key,
47
             'aliases': self._client.aliases})
48
        result_set = await self._client.submit(message)
49
        return await self._aggregate_results(result_set)
50
51
    async def close(self):
52
        """Release side effects"""
53
        if not self._closed:
54
            message = request.RequestMessage(
55
                'traversal', 'close',
56
                {'sideEffect': self._side_effect,
57
                 'aliases': {'g': self._client.aliases}})
58
            result_set = await self._client.submit(message)
59
        self._closed = True
60
        return await result_set.one()
0 ignored issues
show
introduced by
The variable result_set does not seem to be defined in case BooleanNotNode on line 53 is False. Are you sure this can never be the case?
Loading history...
61
62
    async def _aggregate_results(self, result_set):
63
        aggregates = {'list': [], 'set': set(), 'map': {}, 'bulkset': {},
64
                      'none': None}
65
        results = None
66
        async for msg in result_set:
67
            if results is None:
68
                aggregate_to = result_set.aggregate_to
69
                results = aggregates.get(aggregate_to, [])
70
            # on first message, get the right result data structure
71
            # if there is no update to a structure, then the item is the result
72
            if results is None:
73
                results = msg
74
            # updating a map is different than a list or a set
75
            elif isinstance(results, dict):
76
                if aggregate_to == "map":
0 ignored issues
show
introduced by
The variable aggregate_to does not seem to be defined for all execution paths.
Loading history...
77
                    results.update(msg)
78
                else:
79
                    results[msg.object] = msg.bulk
80
            elif isinstance(results, set):
81
                results.update(msg)
82
            # flat add list to result list
83
            else:
84
                results.append(msg)
85
        if results is None:
86
            results = []
87
        return results
88