Passed
Push — main ( 01dfe3...c0672b )
by Eran
01:40
created

graphinate.modeling.extractor()   A

Complexity

Conditions 5

Size

Total Lines 20
Code Lines 8

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 5
eloc 8
nop 2
dl 0
loc 20
rs 9.3333
c 0
b 0
f 0
1
import inspect
2
import itertools
3
from collections import defaultdict, namedtuple
4
from collections.abc import Callable, Iterable, Mapping
5
from dataclasses import dataclass
6
from functools import lru_cache
7
from types import MappingProxyType
8
from typing import Any, Union
9
10
from .enums import Multiplicity
11
from .typing import Edge, Element, Extractor, Items, Node, NodeTypeAbsoluteId, UniverseNode
12
13
14
class GraphModelError(Exception):
15
    pass
16
17
18
@lru_cache(maxsize=128)
19
def _get_namedtuple_element_class(type_name: str, field_names: tuple[str] | str) -> type[Element]:
20
    return namedtuple(type_name, field_names)
21
22
23
def element(element_type: str | None, field_names: Iterable[str] | str | None = None) -> Callable[[], Element]:
24
    """Graph Element Supplier Callable
25
26
    Args:
27
        element_type:
28
        field_names:
29
30
    Returns:
31
        Element Supplier Callable
32
    """
33
34
    if not isinstance(field_names, str):
35
        field_names = tuple(field_names)
36
37
    return _get_namedtuple_element_class(element_type, field_names) if element_type and field_names else tuple
38
39
40
def extractor(obj: Any, key: Extractor | None = None) -> str | None:
41
    """Extract data item from Element
42
43
    Args:
44
        obj:
45
        key:
46
47
    Returns:
48
        Element data item
49
    """
50
    if key is None:
51
        return obj
52
53
    if callable(key):
54
        return key(obj)
55
56
    if isinstance(obj, Mapping) and isinstance(key, str):
57
        return obj.get(key, key)
58
59
    return key
60
61
62
def elements(iterable: Iterable[Any],
63
             element_type: Extractor | None = None,
64
             **getters: Extractor) -> Iterable[Element]:
65
    """Abstract Generator of Graph elements (nodes or edges)
66
67
    Args:
68
        iterable: source of payload
69
        element_type: Optional[Extractor] source of type of the element. Defaults to Element Type name.
70
        getters: Extractor node field sources
71
72
    Returns:
73
        Iterable of Elements.
74
    """
75
    is_dynamic_type = callable(element_type)
76
    static_create_element = None
77
78
    if not is_dynamic_type:
79
        _type = element_type
80
        # Eagerly validate static types.
81
        # Note: This will raise AttributeError if _type is None (consistent with previous behavior, but happens earlier)
82
        # and ValueError if _type is invalid identifier.
83
        if not _type.isidentifier():
84
            raise ValueError(f"Invalid Type: {_type}. Must be a valid Python identifier.")
85
86
        static_create_element = element(_type, getters.keys())
87
88
    for item in iterable:
89
        if is_dynamic_type:
90
            _type = element_type(item)
91
            if not _type.isidentifier():
92
                raise ValueError(f"Invalid Type: {_type}. Must be a valid Python identifier.")
93
            create_element = element(_type, getters.keys())
94
        else:
95
            create_element = static_create_element
96
97
        kwargs = {k: extractor(item, v) for k, v in getters.items()}
98
        yield create_element(**kwargs)
99
100
101
@dataclass
102
class NodeModel:
103
    """Represents a Node Model
104
105
    Args:
106
        type: the type of the Node.
107
        parent_type: the type of the node's parent. Defaults to UniverseNode.
108
        parameters: parameters of the Node. Defaults to None.
109
        label: label source. Defaults to None.
110
        uniqueness: is the Node universally unique. Defaults to True.
111
        multiplicity: Multiplicity of the Node. Defaults to ALL.
112
        generator: Nodes generator method. Defaults to None.
113
114
    Properties:
115
        absolute_id: return the NodeModel absolute_id.
116
    """
117
118
    type: str
119
    parent_type: str | UniverseNode | None = UniverseNode
120
    parameters: set[str] | None = None
121
    label: Callable[[Any], str | None] = None
122
    uniqueness: bool = True
123
    multiplicity: Multiplicity = Multiplicity.ALL
124
    generator: Callable[[], Iterable[Node]] | None = None
125
126
    @property
127
    def absolute_id(self) -> NodeTypeAbsoluteId:
128
        return self.parent_type, self.type
129
130
131
class GraphModel:
132
    """A Graph Model
133
134
    Used to declaratively register Edge and/or Node data supplier functions by using
135
    decorators.
136
137
    Args:
138
        name: the archetype name for Graphs generated based on the GraphModel.
139
    """
140
141
    def __init__(self, name: str):
142
        self.name: str = name
143
        self._node_models: dict[NodeTypeAbsoluteId, list[NodeModel]] = defaultdict(list)
144
        self._node_children: dict[str, list[NodeModel]] = defaultdict(list)
145
        self._edge_generators: dict[str, list[Callable[[], Iterable[Edge]]]] = defaultdict(list)
146
        self._networkx_graph = None
147
148
    def __add__(self, other: 'GraphModel') -> 'GraphModel':
149
        graph_model = GraphModel(name=f"{self.name} + {other.name}")
150
        for m in (self, other):
151
            for k, v in m._node_models.items():
152
                graph_model._node_models[k].extend(v)
153
154
            for k, v in m._node_children.items():
155
                graph_model._node_children[k].extend(v)
156
157
            for k, v in m._edge_generators.items():
158
                graph_model._edge_generators[k].extend(v)
159
160
        return graph_model
161
162
    @property
163
    def node_models(self) -> Mapping[NodeTypeAbsoluteId, list[NodeModel]]:
164
        """
165
        Returns:
166
            NodeModel for Node Types. Key values are NodeTypeAbsoluteId.
167
        """
168
        return MappingProxyType(self._node_models)
169
170
    @property
171
    def edge_generators(self) -> Mapping[str, list[Callable[[], Iterable[Edge]]]]:
172
        """
173
        Returns:
174
            Edge generator functions for Edge Types
175
        """
176
        return MappingProxyType(self._edge_generators)
177
178
    @property
179
    def node_types(self) -> set[str]:
180
        """
181
        Returns:
182
            Node Types
183
        """
184
        return {v.type for v in itertools.chain.from_iterable(self._node_models.values())}
185
186
    def node_children_types(self, _type: str = UniverseNode) -> Mapping[str, list[str]]:
187
        """Children Node Types for given input Node Type
188
189
        Args:
190
            _type:  Node Type. Default value is UNIVERSE_NODE.
191
192
        Returns:
193
            List of children Node Types.
194
        """
195
        return MappingProxyType({k: v for k, v in self._node_children.items() if k == _type})
196
197
    @staticmethod
198
    def _validate_type(node_type: str):
199
        if not callable(node_type) and not node_type.isidentifier():
200
            raise ValueError(f"Invalid Type: {node_type}. Must be a valid Python identifier.")
201
202
    def _validate_node_parameters(self, parameters: list[str]):
203
        node_types = self.node_types
204
        if not all(p.endswith('_id') and p == p.lower() and p[:-3] in node_types for p in parameters):
205
            msg = ("Illegal Arguments. Argument should conform to the following rules: "
206
                   "1) lowercase "
207
                   "2) end with '_id' "
208
                   "3) start with value that exists as registered node type")
209
210
            raise GraphModelError(msg)
211
212
    def node(self,
213
             type_: Extractor | None = None,
214
             parent_type: str | None = UniverseNode,
215
             key: Extractor | None = None,
216
             value: Extractor | None = None,
217
             label: Extractor | None = None,
218
             unique: bool = True,
219
             multiplicity: Multiplicity = Multiplicity.ALL) -> Callable[[Items], None]:
220
        """Decorator to Register a Generator of node payloads as a source for Graph Nodes.
221
        It creates a NodeModel object.
222
223
        Args:
224
            type_: Optional source for the Node Type. Defaults to use Generator function
225
                   name as the Node Type.
226
            parent_type: Optional parent Node Type. Defaults to UNIVERSE_NODE
227
228
            key: Optional source for Node IDs. Defaults to use the complete Node payload
229
                 as Node ID.
230
            value: Optional source for Node value field. Defaults to use the complete
231
                   Node payload as Node ID.
232
            label: Optional source for Node label field. Defaults to use a 'str'
233
                   representation of the complete Node payload.
234
            unique: is the Node universally unique. Defaults to True.
235
            multiplicity: Multiplicity of the Node. Defaults to ALL.
236
237
        Generator Function Signature:
238
            The decorated generator function may accept arguments to receive context from parent nodes.
239
            These arguments MUST conform to the following strict naming convention:
240
            1. The argument name must be lowercase.
241
            2. The argument name must end with '_id'.
242
            3. The prefix (before '_id') must match an existing registered Node Type.
243
244
            Example:
245
                If you have a parent node type 'user', your child node generator can accept 'user_id'.
246
247
                @model.node(parent_type='user')
248
                def get_posts(user_id): ...
249
250
            Note: Arbitrary arguments (e.g., configuration flags) are currently NOT supported.
251
252
        Returns:
253
            None
254
        """
255
256
        def register_node(f: Items):
257
            node_type = type_ or f.__name__
258
            self._validate_type(node_type)
259
260
            model_type = f.__name__ if callable(node_type) else node_type
261
262
            def node_generator(**kwargs: Any) -> Iterable[Node]:
263
                yield from elements(f(**kwargs), node_type, key=key, value=value)
264
265
            parameters = inspect.getfullargspec(f).args
266
            node_model = NodeModel(type=model_type,
267
                                   parent_type=parent_type,
268
                                   parameters=set(parameters),
269
                                   label=label,
270
                                   uniqueness=unique,
271
                                   multiplicity=multiplicity,
272
                                   generator=node_generator)
273
            self._node_models[node_model.absolute_id].append(node_model)
274
            self._node_children[parent_type].append(model_type)
275
276
            self._validate_node_parameters(parameters)
277
278
        return register_node
279
280
    def edge(self,
281
             type_: Extractor | None = None,
282
             source: Extractor = 'source',
283
             target: Extractor = 'target',
284
             label: Extractor | None = str,
285
             value: Extractor | None = None,
286
             weight: Union[float, Callable[[Any], float]] = 1.0,
287
             ) -> Callable[[Items], None]:
288
        """Decorator to Register a generator of edge payloads as a source of Graph Edges.
289
         It creates an Edge generator function.
290
291
        Args:
292
            type_: Optional source for the Edge Type. Defaults to use Generator function
293
                   name as the Edge Type.
294
            source: Source for edge source Node ID.
295
            target: Source for edge target Node ID.
296
            label: Source for edge label.
297
            value: Source for edge value.
298
            weight: Source for edge weight.
299
300
        Returns:
301
            None.
302
        """
303
304
        def register_edge(f: Items):
305
            edge_type = type_ or f.__name__
306
            self._validate_type(edge_type)
307
308
            model_type = f.__name__ if callable(edge_type) else edge_type
309
310
            getters = {
311
                'source': source,
312
                'target': target,
313
                'label': label,
314
                'type': edge_type,
315
                'value': value,
316
                'weight': weight
317
            }
318
319
            def edge_generator(**kwargs: Any) -> Iterable[Edge]:
320
                yield from elements(f(**kwargs), edge_type, **getters)
321
322
            self._edge_generators[model_type].append(edge_generator)
323
324
        return register_edge
325
326
    def rectify(self, _type: Extractor | None = None,
327
                parent_type: str | None = UniverseNode,
328
                key: Extractor | None = None,
329
                value: Extractor | None = None,
330
                label: Extractor | None = None):
331
        """
332
        Rectify the model.
333
        Add a default NodeModel in case of having just edge supplier/s and no node supplier/s.
334
335
       Args:
336
           _type
337
           parent_type
338
           key
339
           value
340
           label
341
342
       Returns:
343
           None
344
        """
345
        if self._edge_generators and not self._node_models:
346
            @self.node(
347
                type_=_type or 'node',
348
                parent_type=parent_type or 'node',
349
                unique=True,
350
                key=key,
351
                value=value,
352
                label=label or str
353
            )
354
            def node():  # pragma: no cover
355
                return
356
                yield
357
358
359
def model(name: str):
360
    """
361
    Create a graph model
362
363
    Args:
364
        name: model name
365
366
    Returns:
367
        GraphModel
368
    """
369
    return GraphModel(name=name)
370
371
372
__all__ = ('GraphModel', 'model', 'elements')
373