Passed
Push — main ( 05fcff...04a574 )
by Eran
01:28
created

python_ast.create_server()   A

Complexity

Conditions 3

Size

Total Lines 21
Code Lines 16

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 16
dl 0
loc 21
rs 9.6
c 0
b 0
f 0
cc 3
nop 3
1
"""
2
Define functions to create an abstract syntax tree (AST) graph model using the 'graphinate' library.
3
The 'ast_graph_model' function parses the AST of a specified class and creates nodes and edges for the graph model.
4
The nodes represent AST nodes with their type and label, while the edges represent relationships between AST nodes.
5
"""
6
7
import ast
8
import hashlib
9
import inspect
10
import operator
11
import pickle
12
import threading
13
import webbrowser
14
from _ast import AST
15
from collections.abc import Iterable
16
from tempfile import TemporaryDirectory
17
18
import graphinate
19
20
21
def _ast_nodes(parsed_asts: Iterable[AST]):
22
    for item in parsed_asts:
23
        if not isinstance(item, ast.Load):
24
            yield item
25
            yield from _ast_nodes(ast.iter_child_nodes(item))
26
27
28 View Code Duplication
def _ast_edge(parsed_ast: AST):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
29
    for child_ast in ast.iter_child_nodes(parsed_ast):
30
        if not isinstance(child_ast, ast.Load):
31
            edge = {'source': parsed_ast, 'target': child_ast}
32
            edge_types = (
33
                field_name
0 ignored issues
show
introduced by
The variable field_name does not seem to be defined for all execution paths.
Loading history...
34
                for field_name, value
35
                in ast.iter_fields(parsed_ast)
36
                if child_ast == value
37
                   or (child_ast in value if isinstance(value, list) else False)
38
            )
39
            edge_type = next(edge_types, None)
40
            if edge_type:
41
                edge['type'] = edge_type
42
            yield edge
43
            yield from _ast_edge(child_ast)
44
45
46 View Code Duplication
def ast_graph_model():
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
47
    """
48
    Create an abstract syntax tree (AST) graph model.
49
50
    Returns:
51
        GraphModel: A graph model representing the AST nodes and their relationships.
52
    """
53
54
    graph_model = graphinate.model(name='AST Graph')
55
56
    root_ast_node = ast.parse(inspect.getsource(graphinate.builders.D3Builder))
57
58
    def node_type(ast_node):
59
        return ast_node.__class__.__name__
60
61
    def node_label(ast_node) -> str:
62
        label = ast_node.__class__.__name__
63
64
        for field_name in ('name', 'id'):
65
            if field_name in ast_node._fields:
66
                value = operator.attrgetter(field_name)(ast_node)
67
                label = f"{label}\n{field_name}: {value}"
68
69
        return label
70
71
    def key(value):
72
        # noinspection InsecureHash
73
        return hashlib.shake_128(pickle.dumps(value)).hexdigest(20)
74
75
    def endpoint(value, endpoint_name):
76
        return key(value[endpoint_name])
77
78
    def source(value):
79
        return endpoint(value, 'source')
80
81
    def target(value):
82
        return endpoint(value, 'target')
83
84
    @graph_model.node(type_=node_type,
85
                      key=key,
86
                      label=node_label,
87
                      unique=True)
88
    def ast_node(**kwargs):
89
        yield from _ast_nodes([root_ast_node])
90
91
    @graph_model.edge(type_='edge',
92
                      source=source,
93
                      target=target,
94
                      label=operator.itemgetter('type'))
95
    def ast_edge(**kwargs):
96
        yield from _ast_edge(root_ast_node)
97
98
    return graph_model
99
100
101
def create_server(port: int, root_directory: str, open_browser: bool = True) -> threading.Thread:
102
    import http.server
103
    import socketserver
104
105
    url = f"http://localhost:{port}"
106
107
    class Handler(http.server.SimpleHTTPRequestHandler):
108
        def __init__(self, *args, **kwargs):
109
            super().__init__(*args, directory=root_directory, **kwargs)
110
111
    def serve():
112
        with socketserver.TCPServer(('', port), Handler) as httpd:
113
            print("Serving at:", url)
114
            httpd.serve_forever()
115
116
    server_thread = threading.Thread(target=serve)
117
    server_thread.daemon = True
118
    server_thread.start()
119
120
    if open_browser:
121
        webbrowser.open(url)
122
123
124
if __name__ == '__main__':
125
    ast_model = ast_graph_model()
126
    # schema = graphinate.builders.GraphQLBuilder(ast_model).build()
127
    # graphinate.graphql.server(schema)
128
129
    diagram = graphinate.builders.MermaidBuilder(ast_model).build()
130
131
    html_diagram = graphinate.mermaid.html(diagram)
132
133
    ## Save the HTML diagram to a file and serve it
134
    with TemporaryDirectory() as temp_dir:
135
        with open(f"{temp_dir}/index.html", 'w') as f:
136
            f.write(html_diagram)
137
138
        # Serve the HTML diagram
139
        create_server(port=8077, root_directory=temp_dir, open_browser=True)
140
141
        # Keep the main thread alive to allow the server to run
142
        try:
143
            while True:
144
                pass
145
        except KeyboardInterrupt:
146
            print("Server stopped")
147