graphinate.engine   A
last analyzed

Complexity

Total Complexity 21

Size/Duplication

Total Lines 132
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 21
eloc 65
dl 0
loc 132
rs 10
c 0
b 0
f 0

12 Methods

Rating   Name   Duplication   Size   Complexity  
A GraphEngine._notify_complete() 0 2 2
A GraphEngine._notify_node() 0 3 2
A GraphEngine._notify_error() 0 3 2
A GraphEngine._notify_edge() 0 3 2
A GraphObserver.on_error() 0 2 1
A GraphObserver.on_edge() 0 2 1
A GraphEngine.__init__() 0 4 1
A GraphObserver.on_node() 0 2 1
A GraphEngine._broadcast() 0 15 4
A GraphEngine.run() 0 22 3
A GraphObserver.on_complete() 0 2 1
A GraphEngine.subscribe() 0 3 1
1
import inspect
2
from dataclasses import dataclass
3
from typing import Protocol
4
5
from .modeling import GraphModel
6
7
# region Events
8
9
@dataclass
10
class GraphEvent:
11
    """Base class for all graph execution events."""
12
13
14
@dataclass
15
class NodeEvent(GraphEvent):
16
    """Emitted when a node is discovered/created."""
17
    id: str
18
    data: dict
19
    parent_id: str | None = None
20
21
22
@dataclass
23
class EdgeEvent(GraphEvent):
24
    """Emitted when an edge is discovered/created."""
25
    source_id: str
26
    target_id: str
27
    data: dict
28
29
30
@dataclass
31
class ErrorEvent(GraphEvent):
32
    """Emitted when a generator raises an exception."""
33
    error: Exception
34
    context: dict
35
36
37
# endregion
38
39
# region Interfaces
40
41
class GraphObserver(Protocol):
42
    """Interface for components that listen to the GraphEngine."""
43
44
    async def on_node(self, event: NodeEvent) -> None:
45
        ...
46
47
    async def on_edge(self, event: EdgeEvent) -> None:
48
        ...
49
50
    async def on_error(self, event: ErrorEvent) -> None:
51
        ...
52
53
    async def on_complete(self) -> None:
54
        ...
55
56
57
# endregion
58
59
# region Engine
60
61
class GraphEngine:
62
    """The Execution Engine.
63
64
    Responsible for iterating over the GraphModel, handling the 'magic'
65
    (ID generation, dependency injection), and notifying observers.
66
    """
67
68
    def __init__(self, model: GraphModel):
69
        self.model = model
70
        self._observers: list[GraphObserver] = []
71
        self._is_running = False
72
73
    def subscribe(self, observer: GraphObserver):
74
        """Register an observer to receive events."""
75
        self._observers.append(observer)
76
77
    async def run(self):
78
        """Execute the graph generation process."""
79
        if self._is_running:
80
            raise RuntimeError("Engine is already running")
81
82
        self._is_running = True
83
84
        try:
85
            # TODO: This is where the logic from NetworkxBuilder._populate_nodes moves to.
86
            # It will need to handle:
87
            # 1. Root nodes
88
            # 2. Recursive children
89
            # 3. Async vs Sync generators (GEP-023)
90
91
            # Mock implementation for scaffold
92
            await self._notify_node("root", {"label": "Root"}, None)
93
94
        except Exception as e:
95
            await self._notify_error(e)
96
        finally:
97
            await self._notify_complete()
98
            self._is_running = False
99
100
    # region Internal Notification Helpers
101
102
    async def _notify_node(self, node_id: str, data: dict, parent_id: str | None):
103
        event = NodeEvent(id=node_id, data=data, parent_id=parent_id)
104
        await self._broadcast(lambda obs: obs.on_node(event))
105
106
    async def _notify_edge(self, source: str, target: str, data: dict):
107
        event = EdgeEvent(source_id=source, target_id=target, data=data)
108
        await self._broadcast(lambda obs: obs.on_edge(event))
109
110
    async def _notify_error(self, error: Exception):
111
        event = ErrorEvent(error=error, context={})
112
        await self._broadcast(lambda obs: obs.on_error(event))
113
114
    async def _notify_complete(self):
115
        await self._broadcast(lambda obs: obs.on_complete())
116
117
    async def _broadcast(self, callback):
118
        """Notify all observers.
119
120
        Design Decision: Sequential await.
121
        If one observer is slow, it slows down the engine (Backpressure).
122
        This is usually desired to prevent OOM.
123
        """
124
        for observer in self._observers:
125
            if inspect.iscoroutinefunction(callback):
126
                await callback(observer)
127
            else:
128
                # Handle synchronous observers if we allow them
129
                res = callback(observer)
130
                if inspect.isawaitable(res):
131
                    await res
132
133
    # endregion
134