Passed
Push — main ( e1e099...73254d )
by Eran
01:42
created

graphinate.engine.GraphObserver.on_error()   A

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 2
dl 0
loc 2
rs 10
c 0
b 0
f 0
cc 1
nop 2
1
import asyncio
2
import inspect
3
from dataclasses import dataclass
4
from typing import Protocol, Any, AsyncIterable, List
5
6
from .modeling import GraphModel
7
8
# region Events
9
10
@dataclass
11
class GraphEvent:
12
    """Base class for all graph execution events."""
13
    pass
14
15
@dataclass
16
class NodeEvent(GraphEvent):
17
    """Emitted when a node is discovered/created."""
18
    id: str
19
    data: dict
20
    parent_id: str | None = None
21
22
@dataclass
23
class EdgeEvent(GraphEvent):
24
    """Emitted when an edge is discovered/created."""
25
    source_id: str
26
    target_id: str
27
    data: dict
28
29
@dataclass
30
class ErrorEvent(GraphEvent):
31
    """Emitted when a generator raises an exception."""
32
    error: Exception
33
    context: dict
34
35
# endregion
36
37
# region Interfaces
38
39
class GraphObserver(Protocol):
40
    """Interface for components that listen to the GraphEngine."""
41
    
42
    async def on_node(self, event: NodeEvent) -> None:
43
        ...
44
45
    async def on_edge(self, event: EdgeEvent) -> None:
46
        ...
47
48
    async def on_error(self, event: ErrorEvent) -> None:
49
        ...
50
51
    async def on_complete(self) -> None:
52
        ...
53
54
# endregion
55
56
# region Engine
57
58
class GraphEngine:
59
    """
60
    The Execution Engine.
61
    
62
    Responsible for iterating over the GraphModel, handling the 'magic'
63
    (ID generation, dependency injection), and notifying observers.
64
    """
65
66
    def __init__(self, model: GraphModel):
67
        self.model = model
68
        self._observers: List[GraphObserver] = []
69
        self._is_running = False
70
71
    def subscribe(self, observer: GraphObserver):
72
        """Register an observer to receive events."""
73
        self._observers.append(observer)
74
75
    async def run(self):
76
        """
77
        Execute the graph generation process.
78
        """
79
        if self._is_running:
80
            raise RuntimeError("Engine is already running")
81
        
82
        self._is_running = True
83
        
84
        try:
85
            # TODO: This is where the logic from NetworkxBuilder._populate_nodes moves to.
86
            # It will need to handle:
87
            # 1. Root nodes
88
            # 2. Recursive children
89
            # 3. Async vs Sync generators (GEP-023)
90
            
91
            # Mock implementation for scaffold
92
            await self._notify_node("root", {"label": "Root"}, None)
93
            
94
        except Exception as e:
95
            await self._notify_error(e)
96
        finally:
97
            await self._notify_complete()
98
            self._is_running = False
99
100
    # region Internal Notification Helpers
101
102
    async def _notify_node(self, node_id: str, data: dict, parent_id: str | None):
103
        event = NodeEvent(id=node_id, data=data, parent_id=parent_id)
104
        await self._broadcast(lambda obs: obs.on_node(event))
105
106
    async def _notify_edge(self, source: str, target: str, data: dict):
107
        event = EdgeEvent(source_id=source, target_id=target, data=data)
108
        await self._broadcast(lambda obs: obs.on_edge(event))
109
110
    async def _notify_error(self, error: Exception):
111
        event = ErrorEvent(error=error, context={})
112
        await self._broadcast(lambda obs: obs.on_error(event))
113
114
    async def _notify_complete(self):
115
        await self._broadcast(lambda obs: obs.on_complete())
116
117
    async def _broadcast(self, callback):
118
        """
119
        Notify all observers.
120
        
121
        Design Decision: Sequential await.
122
        If one observer is slow, it slows down the engine (Backpressure).
123
        This is usually desired to prevent OOM.
124
        """
125
        for observer in self._observers:
126
            if inspect.iscoroutinefunction(callback):
127
                await callback(observer)
128
            else:
129
                # Handle synchronous observers if we allow them
130
                res = callback(observer)
131
                if inspect.isawaitable(res):
132
                    await res
133
134
    # endregion
135