1
|
|
|
"""Analyze definition.""" |
2
|
|
|
from inspect import getclosurevars |
3
|
|
|
from typing import Any, List, NamedTuple, Tuple, no_type_check |
4
|
|
|
|
5
|
|
|
from .definition import CallableFunction, NodeMetadata |
6
|
|
|
|
7
|
|
|
|
8
|
|
|
__all__ = ["analyze", "print_analyze", "Node"] |
9
|
|
|
|
10
|
|
|
|
11
|
|
|
class Node(NamedTuple): |
12
|
|
|
"""Node aggregate node definition. |
13
|
|
|
|
14
|
|
|
- name: named operation |
15
|
|
|
- properties: a list of tuple (name, value) for definition. |
16
|
|
|
- edges: a list of tuple (name, node list) for definition. |
17
|
|
|
""" |
18
|
|
|
|
19
|
|
|
name: str |
20
|
|
|
properties: List[Tuple[str, Any]] |
21
|
|
|
# edges: List[Tuple[str, List['Node']]] |
22
|
|
|
# https://github.com/python/mypy/issues/731 |
23
|
|
|
edges: List[Tuple[str, List[Any]]] |
24
|
|
|
|
25
|
|
|
def __str__(self): |
26
|
|
|
return print_analyze(a_node=self) |
27
|
|
|
|
28
|
|
|
|
29
|
|
|
# pylint: disable=protected-access |
30
|
|
|
@no_type_check # it's a shortcut for hasattr ... |
31
|
|
|
def analyze(target: CallableFunction) -> Node: |
32
|
|
|
"""Analyze specified target and return a Node representation. |
33
|
|
|
|
34
|
|
|
# Parameters |
35
|
|
|
- target (CallableFunction): async function to analyze |
36
|
|
|
|
37
|
|
|
# Returns |
38
|
|
|
(Node) a defintion |
39
|
|
|
""" |
40
|
|
|
|
41
|
|
|
nonlocals = getclosurevars(target).nonlocals |
42
|
|
|
|
43
|
|
|
def _analyze_property(p): |
44
|
|
|
"""Return a tuple (name, value) or (name, function name) as property.""" |
45
|
|
|
value = nonlocals[p] if p in nonlocals else None |
46
|
|
|
return p, value.__name__ if value and callable(value) else value |
47
|
|
|
|
48
|
|
|
def _analyze_edges(egde_name): |
49
|
|
|
"""Lookup children node from egde_name local var.""" |
50
|
|
|
value = None |
51
|
|
|
if egde_name in nonlocals and nonlocals[egde_name]: |
52
|
|
|
edge = nonlocals[egde_name] |
53
|
|
|
# it could be a collection of node |
54
|
|
|
if hasattr(edge, "__iter__"): |
55
|
|
|
value = list(map(analyze, edge)) |
56
|
|
|
else: # or a single node |
57
|
|
|
value = [analyze(edge)] |
58
|
|
|
return (egde_name, value) |
59
|
|
|
|
60
|
|
|
if hasattr(target, "__node_metadata"): |
61
|
|
|
# its a node construct. |
62
|
|
|
node = target.__node_metadata |
63
|
|
|
if not isinstance(node, NodeMetadata): |
64
|
|
|
raise RuntimeError( |
65
|
|
|
f'attr __node_metadata of {target} is not a NodeMetadata!' |
66
|
|
|
) |
67
|
|
|
return Node( |
68
|
|
|
name=node.name, |
69
|
|
|
properties=list(map(_analyze_property, node.properties)), |
70
|
|
|
edges=list( |
71
|
|
|
filter(lambda p: p is not None, map(_analyze_edges, node.edges)) |
72
|
|
|
), |
73
|
|
|
) |
74
|
|
|
|
75
|
|
|
# simple function |
76
|
|
|
return Node( |
77
|
|
|
name=target.__name__.lstrip("_") |
78
|
|
|
if hasattr(target, "__name__") |
79
|
|
|
else "anonymous", |
80
|
|
|
properties=list(map(_analyze_property, nonlocals.keys())), |
81
|
|
|
edges=[], |
82
|
|
|
) |
83
|
|
|
|
84
|
|
|
|
85
|
|
|
def print_analyze(a_node: Node, indent=0, label=None) -> None: |
86
|
|
|
"""Print a textual representation of a Node.""" |
87
|
|
|
_ident = ' ' |
88
|
|
|
_space = f'{_ident * indent} ' |
89
|
|
|
if label: |
90
|
|
|
print(f"{_space}--({label})--> {a_node.name}:") |
91
|
|
|
_space += f"{_ident}{' ' * len(label)}" |
92
|
|
|
else: |
93
|
|
|
print(f"{_space}--> {a_node.name}:") |
94
|
|
|
|
95
|
|
|
for k, v in a_node.properties: |
96
|
|
|
print(f"{_space} {k}: {v}") |
97
|
|
|
|
98
|
|
|
for _label, children in a_node.edges: |
99
|
|
|
if children: |
100
|
|
|
for child in children: |
101
|
|
|
print_analyze(a_node=child, indent=indent + 1, label=_label) |
102
|
|
|
|