networkx_query.parser   A
last analyzed

Complexity

Total Complexity 17

Size/Duplication

Total Lines 86
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
wmc 17
eloc 48
dl 0
loc 86
rs 10
c 0
b 0
f 0

5 Functions

Rating   Name   Duplication   Size   Complexity  
A _check_item_ast() 0 7 2
C parse() 0 24 9
A compile_ast() 0 7 3
A explain() 0 8 2
A prepare_query() 0 14 1
1
"""Main parser and compile function."""
2
from collections import deque
3
from typing import Dict, List, Optional
4
5
from .definition import (
6
    NETWORKX_OPERATORS_REGISTERY,
7
    Evaluator,
8
    ItemAST,
9
    OperatoryArity,
10
    ParserException,
11
    operator_factory,
12
)
13
from .operator import *  # noqa: F401,F403
14
15
__all__ = ["parse", "explain", "compile_ast", "prepare_query"]
16
17
18
def _check_item_ast(item: ItemAST, stack: deque) -> ItemAST:
19
    (compliant, delta) = item.check_arity()
20
    if not compliant:
21
        raise ParserException(f'Invalid parameters for "{item.op.name}" operator ({delta} #parameters)', stack)
22
    # if not item.check_profile():
23
    #    raise ParserException(f'Invalid type parameters for "{item.op.name}" operator', stack)
24
    return item
25
26
27
def parse(expra: Dict, stack: Optional[deque] = None) -> ItemAST:
28
    """Tranform json query into Item AST."""
29
    result = []
30
    _stack = stack if stack else deque()
31
32
    _stack.append(expra)
33
    for (op, v) in expra.items():
34
        if op not in NETWORKX_OPERATORS_REGISTERY:
35
            raise ParserException(f'Unsupported "{op}" operator', _stack)
36
        operator = NETWORKX_OPERATORS_REGISTERY[op]
37
38
        if operator.combinator:
39
            args = []
40
            # shortcut List declaration as single item
41
            items = v if isinstance(v, List) else [v]
42
            for item in items:
43
                args.append(parse(item, _stack))
44
            result.append(_check_item_ast(ItemAST(op=operator, args=args), _stack))
45
46
        else:
47
            result.append(_check_item_ast(ItemAST(op=operator, args=[v] if not isinstance(v, List) else v), _stack))
48
49
    _stack.pop()
50
    return result[0] if len(result) == 1 else ItemAST(op=NETWORKX_OPERATORS_REGISTERY['and'], args=result)
51
52
53
def explain(ast: ItemAST) -> Dict:
54
    """Convert ast as dict."""
55
    result = {}
56
    if ast.op.combinator:
57
        result[ast.op.name] = list(map(explain, ast.args))
58
    else:
59
        result[ast.op.name] = ast.args
60
    return result
61
62
63
def compile_ast(ast: ItemAST) -> Evaluator:
64
    """Compile AST in an Evaluator function."""
65
    if ast.op.arity == OperatoryArity.UNARY:  # pragma: no cover
66
        return operator_factory(ast.op.function)
67
    if ast.op.combinator:
68
        return operator_factory(ast.op.function, *list(map(compile_ast, ast.args)))
69
    return operator_factory(ast.op.function, *ast.args)
70
71
72
def prepare_query(query: Dict) -> Evaluator:
73
    """Transform expression query as a function.
74
75
    Arguments:
76
        query (Dict): expression query as dictionary
77
78
    Returns:
79
        (Evaluator): evaluator function
80
81
    Exceptions:
82
        (ParserException): if a parse error occurs
83
84
    """
85
    return compile_ast(parse(expra=query))
86