graphinate.modeling.NodeModel.absolute_id()   A
last analyzed

Complexity

Conditions 1

Size

Total Lines 3
Code Lines 3

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
eloc 3
dl 0
loc 3
rs 10
c 0
b 0
f 0
cc 1
nop 1
1
import inspect
2
import itertools
3
from collections import defaultdict, namedtuple
4
from collections.abc import Callable, Iterable, Mapping
5
from dataclasses import dataclass
6
from enum import Enum, auto
7
from functools import lru_cache
8
from types import MappingProxyType
9
from typing import Any, Union
10
11
from .typing import Edge, Element, Extractor, Items, Node, NodeTypeAbsoluteId, UniverseNode
12
13
14
class GraphModelError(Exception):
15
    pass
16
17
18
@lru_cache(maxsize=128)
19
def _get_namedtuple_class(type_name: str, field_names: tuple[str] | str) -> type:
20
    return namedtuple(type_name, field_names)
21
22
23
def element(element_type: str | None, field_names: Iterable[str] | str | None = None) -> Callable[[], Element]:
24
    """Graph Element Supplier Callable
25
26
    Args:
27
        element_type:
28
        field_names:
29
30
    Returns:
31
        Element Supplier Callable
32
    """
33
    if element_type and field_names:
34
        if isinstance(field_names, str):
35
            return _get_namedtuple_class(element_type, field_names)
36
        return _get_namedtuple_class(element_type, tuple(field_names))
37
    return tuple
38
39
40
def extractor(obj: Any, key: Extractor | None = None) -> str | None:
41
    """Extract data item from Element
42
43
    Args:
44
        obj:
45
        key:
46
47
    Returns:
48
        Element data item
49
    """
50
    if key is None:
51
        return obj
52
53
    if callable(key):
54
        return key(obj)
55
56
    if isinstance(obj, Mapping) and isinstance(key, str):
57
        return obj.get(key, key)
58
59
    return key
60
61
62
def elements(iterable: Iterable[Any],
63
             element_type: Extractor | None = None,
64
             **getters: Extractor) -> Iterable[Element]:
65
    """Abstract Generator of Graph elements (nodes or edges)
66
67
    Args:
68
        iterable: source of payload
69
        element_type: Optional[Extractor] source of type of the element. Defaults to Element Type name.
70
        getters: Extractor node field sources
71
72
    Returns:
73
        Iterable of Elements.
74
    """
75
    is_dynamic_type = callable(element_type)
76
    static_create_element = None
77
78
    if not is_dynamic_type:
79
        _type = element_type
80
        # Eagerly validate static types.
81
        # Note: This will raise AttributeError if _type is None (consistent with previous behavior, but happens earlier)
82
        # and ValueError if _type is invalid identifier.
83
        if not _type.isidentifier():
84
            raise ValueError(f"Invalid Type: {_type}. Must be a valid Python identifier.")
85
86
        static_create_element = element(_type, getters.keys())
87
88
    for item in iterable:
89
        if is_dynamic_type:
90
            _type = element_type(item)
91
            if not _type.isidentifier():
92
                raise ValueError(f"Invalid Type: {_type}. Must be a valid Python identifier.")
93
            create_element = element(_type, getters.keys())
94
        else:
95
            create_element = static_create_element
96
97
        kwargs = {k: extractor(item, v) for k, v in getters.items()}
98
        yield create_element(**kwargs)
99
100
101
class Multiplicity(Enum):
102
    ADD = auto()
103
    ALL = auto()
104
    FIRST = auto()
105
    LAST = auto()
106
107
108
@dataclass
109
class NodeModel:
110
    """Represents a Node Model
111
112
    Args:
113
        type: the type of the Node.
114
        parent_type: the type of the node's parent. Defaults to UniverseNode.
115
        parameters: parameters of the Node. Defaults to None.
116
        label: label source. Defaults to None.
117
        uniqueness: is the Node universally unique. Defaults to True.
118
        multiplicity: Multiplicity of the Node. Defaults to ALL.
119
        generator: Nodes generator method. Defaults to None.
120
121
    Properties:
122
        absolute_id: return the NodeModel absolute_id.
123
    """
124
125
    type: str
126
    parent_type: str | UniverseNode | None = UniverseNode
127
    parameters: set[str] | None = None
128
    label: Callable[[Any], str | None] = None
129
    uniqueness: bool = True
130
    multiplicity: Multiplicity = Multiplicity.ALL
131
    generator: Callable[[], Iterable[Node]] | None = None
132
133
    @property
134
    def absolute_id(self) -> NodeTypeAbsoluteId:
135
        return self.parent_type, self.type
136
137
138
class GraphModel:
139
    """A Graph Model
140
141
    Used to declaratively register Edge and/or Node data supplier functions by using
142
    decorators.
143
144
    Args:
145
        name: the archetype name for Graphs generated based on the GraphModel.
146
    """
147
148
    def __init__(self, name: str):
149
        self.name: str = name
150
        self._node_models: dict[NodeTypeAbsoluteId, list[NodeModel]] = defaultdict(list)
151
        self._node_children: dict[str, list[NodeModel]] = defaultdict(list)
152
        self._edge_generators: dict[str, list[Callable[[], Iterable[Edge]]]] = defaultdict(list)
153
        self._networkx_graph = None
154
155
    def __add__(self, other: 'GraphModel') -> 'GraphModel':
156
        graph_model = GraphModel(name=f"{self.name} + {other.name}")
157
        for m in (self, other):
158
            for k, v in m._node_models.items():
159
                graph_model._node_models[k].extend(v)
160
161
            for k, v in m._node_children.items():
162
                graph_model._node_children[k].extend(v)
163
164
            for k, v in m._edge_generators.items():
165
                graph_model._edge_generators[k].extend(v)
166
167
        return graph_model
168
169
    @property
170
    def node_models(self) -> Mapping[NodeTypeAbsoluteId, list[NodeModel]]:
171
        """
172
        Returns:
173
            NodeModel for Node Types. Key values are NodeTypeAbsoluteId.
174
        """
175
        return MappingProxyType(self._node_models)
176
177
    @property
178
    def edge_generators(self) -> Mapping[str, list[Callable[[], Iterable[Edge]]]]:
179
        """
180
        Returns:
181
            Edge generator functions for Edge Types
182
        """
183
        return MappingProxyType(self._edge_generators)
184
185
    @property
186
    def node_types(self) -> set[str]:
187
        """
188
        Returns:
189
            Node Types
190
        """
191
        return {v.type for v in itertools.chain.from_iterable(self._node_models.values())}
192
193
    def node_children_types(self, _type: str = UniverseNode) -> Mapping[str, list[str]]:
194
        """Children Node Types for given input Node Type
195
196
        Args:
197
            _type:  Node Type. Default value is UNIVERSE_NODE.
198
199
        Returns:
200
            List of children Node Types.
201
        """
202
        return MappingProxyType({k: v for k, v in self._node_children.items() if k == _type})
203
204
    @staticmethod
205
    def _validate_type(node_type: str):
206
        if not callable(node_type) and not node_type.isidentifier():
207
            raise ValueError(f"Invalid Type: {node_type}. Must be a valid Python identifier.")
208
209
    def _validate_node_parameters(self, parameters: list[str]):
210
        node_types = self.node_types
211
        if not all(p.endswith('_id') and p == p.lower() and p[:-3] in node_types for p in parameters):
212
            msg = ("Illegal Arguments. Argument should conform to the following rules: "
213
                   "1) lowercase "
214
                   "2) end with '_id' "
215
                   "3) start with value that exists as registered node type")
216
217
            raise GraphModelError(msg)
218
219
    def node(self,
220
             type_: Extractor | None = None,
221
             parent_type: str | None = UniverseNode,
222
             key: Extractor | None = None,
223
             value: Extractor | None = None,
224
             label: Extractor | None = None,
225
             unique: bool = True,
226
             multiplicity: Multiplicity = Multiplicity.ALL) -> Callable[[Items], None]:
227
        """Decorator to Register a Generator of node payloads as a source for Graph Nodes.
228
        It creates a NodeModel object.
229
230
        Args:
231
            type_: Optional source for the Node Type. Defaults to use Generator function
232
                   name as the Node Type.
233
            parent_type: Optional parent Node Type. Defaults to UNIVERSE_NODE
234
235
            key: Optional source for Node IDs. Defaults to use the complete Node payload
236
                 as Node ID.
237
            value: Optional source for Node value field. Defaults to use the complete
238
                   Node payload as Node ID.
239
            label: Optional source for Node label field. Defaults to use a 'str'
240
                   representation of the complete Node payload.
241
            unique: is the Node universally unique. Defaults to True.
242
            multiplicity: Multiplicity of the Node. Defaults to ALL.
243
244
        Generator Function Signature:
245
            The decorated generator function may accept arguments to receive context from parent nodes.
246
            These arguments MUST conform to the following strict naming convention:
247
            1. The argument name must be lowercase.
248
            2. The argument name must end with '_id'.
249
            3. The prefix (before '_id') must match an existing registered Node Type.
250
251
            Example:
252
                If you have a parent node type 'user', your child node generator can accept 'user_id'.
253
254
                @model.node(parent_type='user')
255
                def get_posts(user_id): ...
256
257
            Note: Arbitrary arguments (e.g., configuration flags) are currently NOT supported.
258
259
        Returns:
260
            None
261
        """
262
263
        def register_node(f: Items):
264
            node_type = type_ or f.__name__
265
            self._validate_type(node_type)
266
267
            model_type = f.__name__ if callable(node_type) else node_type
268
269
            def node_generator(**kwargs: Any) -> Iterable[Node]:
270
                yield from elements(f(**kwargs), node_type, key=key, value=value)
271
272
            parameters = inspect.getfullargspec(f).args
273
            node_model = NodeModel(type=model_type,
274
                                   parent_type=parent_type,
275
                                   parameters=set(parameters),
276
                                   label=label,
277
                                   uniqueness=unique,
278
                                   multiplicity=multiplicity,
279
                                   generator=node_generator)
280
            self._node_models[node_model.absolute_id].append(node_model)
281
            self._node_children[parent_type].append(model_type)
282
283
            self._validate_node_parameters(parameters)
284
285
        return register_node
286
287
    def edge(self,
288
             type_: Extractor | None = None,
289
             source: Extractor = 'source',
290
             target: Extractor = 'target',
291
             label: Extractor | None = str,
292
             value: Extractor | None = None,
293
             weight: Union[float, Callable[[Any], float]] = 1.0,
294
             ) -> Callable[[Items], None]:
295
        """Decorator to Register a generator of edge payloads as a source of Graph Edges.
296
         It creates an Edge generator function.
297
298
        Args:
299
            type_: Optional source for the Edge Type. Defaults to use Generator function
300
                   name as the Edge Type.
301
            source: Source for edge source Node ID.
302
            target: Source for edge target Node ID.
303
            label: Source for edge label.
304
            value: Source for edge value.
305
            weight: Source for edge weight.
306
307
        Returns:
308
            None.
309
        """
310
311
        def register_edge(f: Items):
312
            edge_type = type_ or f.__name__
313
            self._validate_type(edge_type)
314
315
            model_type = f.__name__ if callable(edge_type) else edge_type
316
317
            getters = {
318
                'source': source,
319
                'target': target,
320
                'label': label,
321
                'type': edge_type,
322
                'value': value,
323
                'weight': weight
324
            }
325
326
            def edge_generator(**kwargs: Any) -> Iterable[Edge]:
327
                yield from elements(f(**kwargs), edge_type, **getters)
328
329
            self._edge_generators[model_type].append(edge_generator)
330
331
        return register_edge
332
333
    def rectify(self, _type: Extractor | None = None,
334
                parent_type: str | None = UniverseNode,
335
                key: Extractor | None = None,
336
                value: Extractor | None = None,
337
                label: Extractor | None = None):
338
        """
339
        Rectify the model.
340
        Add a default NodeModel in case of having just edge supplier/s and no node supplier/s.
341
342
       Args:
343
           _type
344
           parent_type
345
           key
346
           value
347
           label
348
349
       Returns:
350
           None
351
        """
352
        if self._edge_generators and not self._node_models:
353
            @self.node(
354
                type_=_type or 'node',
355
                parent_type=parent_type or 'node',
356
                unique=True,
357
                key=key,
358
                value=value,
359
                label=label or str
360
            )
361
            def node():  # pragma: no cover
362
                return
363
                yield
364
365
366
def model(name: str):
367
    """
368
    Create a graph model
369
370
    Args:
371
        name: model name
372
373
    Returns:
374
        GraphModel
375
    """
376
    return GraphModel(name=name)
377
378
379
__all__ = ('GraphModel', 'Multiplicity', 'model')
380