erivlis /
graphinate
| 1 | """ |
||
| 2 | Define functions to create an abstract syntax tree (AST) graph model using the 'graphinate' library. |
||
| 3 | The 'ast_graph_model' function parses the AST of a specified class and creates nodes and edges for the graph model. |
||
| 4 | The nodes represent AST nodes with their type and label, while the edges represent relationships between AST nodes. |
||
| 5 | """ |
||
| 6 | |||
| 7 | import ast |
||
| 8 | import hashlib |
||
| 9 | import inspect |
||
| 10 | import operator |
||
| 11 | import pickle |
||
| 12 | import threading |
||
| 13 | import webbrowser |
||
| 14 | from _ast import AST |
||
| 15 | from collections.abc import Iterable |
||
| 16 | from tempfile import TemporaryDirectory |
||
| 17 | |||
| 18 | import graphinate |
||
| 19 | |||
| 20 | |||
| 21 | def _ast_nodes(parsed_asts: Iterable[AST]): |
||
| 22 | for item in parsed_asts: |
||
| 23 | if not isinstance(item, ast.Load): |
||
| 24 | yield item |
||
| 25 | yield from _ast_nodes(ast.iter_child_nodes(item)) |
||
| 26 | |||
| 27 | |||
| 28 | View Code Duplication | def _ast_edge(parsed_ast: AST): |
|
|
0 ignored issues
–
show
Duplication
introduced
by
Loading history...
|
|||
| 29 | for child_ast in ast.iter_child_nodes(parsed_ast): |
||
| 30 | if not isinstance(child_ast, ast.Load): |
||
| 31 | edge = {'source': parsed_ast, 'target': child_ast} |
||
| 32 | edge_types = ( |
||
| 33 | field_name |
||
| 34 | for field_name, value |
||
| 35 | in ast.iter_fields(parsed_ast) |
||
| 36 | if child_ast == value |
||
| 37 | or (child_ast in value if isinstance(value, list) else False) |
||
| 38 | ) |
||
| 39 | edge_type = next(edge_types, None) |
||
| 40 | if edge_type: |
||
| 41 | edge['type'] = edge_type |
||
| 42 | yield edge |
||
| 43 | yield from _ast_edge(child_ast) |
||
| 44 | |||
| 45 | |||
| 46 | View Code Duplication | def ast_graph_model(): |
|
|
0 ignored issues
–
show
|
|||
| 47 | """ |
||
| 48 | Create an abstract syntax tree (AST) graph model. |
||
| 49 | |||
| 50 | Returns: |
||
| 51 | GraphModel: A graph model representing the AST nodes and their relationships. |
||
| 52 | """ |
||
| 53 | |||
| 54 | code_object = graphinate.builders.D3Builder |
||
| 55 | |||
| 56 | graph_model = graphinate.model(name=f'AST Graph - {code_object.__qualname__}',) |
||
| 57 | |||
| 58 | root_ast_node = ast.parse(inspect.getsource(code_object)) |
||
| 59 | |||
| 60 | def node_type(ast_node): |
||
| 61 | return ast_node.__class__.__name__ |
||
| 62 | |||
| 63 | def node_label(ast_node) -> str: |
||
| 64 | label = ast_node.__class__.__name__ |
||
| 65 | |||
| 66 | for field_name in ('name', 'id'): |
||
| 67 | if field_name in ast_node._fields: |
||
| 68 | value = operator.attrgetter(field_name)(ast_node) |
||
| 69 | label = f"{label}\n{field_name}: {value}" |
||
| 70 | |||
| 71 | return label |
||
| 72 | |||
| 73 | def key(value): |
||
| 74 | # noinspection InsecureHash |
||
| 75 | return hashlib.shake_128(pickle.dumps(value)).hexdigest(20) |
||
| 76 | |||
| 77 | def endpoint(value, endpoint_name): |
||
| 78 | return key(value[endpoint_name]) |
||
| 79 | |||
| 80 | def source(value): |
||
| 81 | return endpoint(value, 'source') |
||
| 82 | |||
| 83 | def target(value): |
||
| 84 | return endpoint(value, 'target') |
||
| 85 | |||
| 86 | @graph_model.node(type_=node_type, |
||
| 87 | key=key, |
||
| 88 | label=node_label, |
||
| 89 | unique=True) |
||
| 90 | def ast_node(**kwargs): |
||
| 91 | yield from _ast_nodes([root_ast_node]) |
||
| 92 | |||
| 93 | @graph_model.edge(type_='edge', |
||
| 94 | source=source, |
||
| 95 | target=target, |
||
| 96 | label=operator.itemgetter('type')) |
||
| 97 | def ast_edge(**kwargs): |
||
| 98 | yield from _ast_edge(root_ast_node) |
||
| 99 | |||
| 100 | return graph_model |
||
| 101 | |||
| 102 | |||
| 103 | def create_server(port: int, root_directory: str, open_browser: bool = True) -> threading.Thread: |
||
| 104 | import http.server |
||
| 105 | import socketserver |
||
| 106 | |||
| 107 | url = f"http://localhost:{port}" |
||
| 108 | |||
| 109 | class Handler(http.server.SimpleHTTPRequestHandler): |
||
| 110 | def __init__(self, *args, **kwargs): |
||
| 111 | super().__init__(*args, directory=root_directory, **kwargs) |
||
| 112 | |||
| 113 | def serve(): |
||
| 114 | with socketserver.TCPServer(('', port), Handler) as httpd: |
||
| 115 | print("Serving at:", url) |
||
| 116 | httpd.serve_forever() |
||
| 117 | |||
| 118 | server_thread = threading.Thread(target=serve) |
||
| 119 | server_thread.daemon = True |
||
| 120 | server_thread.start() |
||
| 121 | |||
| 122 | if open_browser: |
||
| 123 | webbrowser.open(url) |
||
| 124 | |||
| 125 | |||
| 126 | if __name__ == '__main__': |
||
| 127 | ast_model = ast_graph_model() |
||
| 128 | # schema = graphinate.builders.GraphQLBuilder(ast_model).build() |
||
| 129 | # graphinate.graphql.server(schema) |
||
| 130 | |||
| 131 | diagram = graphinate.builders.MermaidBuilder(ast_model).build(with_edge_labels=False) |
||
| 132 | |||
| 133 | html_diagram = graphinate.mermaid.html(diagram) |
||
| 134 | |||
| 135 | ## Save the HTML diagram to a file and serve it |
||
| 136 | with TemporaryDirectory() as temp_dir: |
||
| 137 | with open(f"{temp_dir}/index.html", 'w') as f: |
||
| 138 | f.write(html_diagram) |
||
| 139 | |||
| 140 | # Serve the HTML diagram |
||
| 141 | create_server(port=8077, root_directory=temp_dir, open_browser=True) |
||
| 142 | |||
| 143 | # Keep the main thread alive to allow the server to run |
||
| 144 | try: |
||
| 145 | while True: |
||
| 146 | pass |
||
| 147 | except KeyboardInterrupt: |
||
| 148 | print("Server stopped") |
||
| 149 |