1
|
|
|
import importlib |
2
|
|
|
import json |
3
|
|
|
from pathlib import Path |
4
|
|
|
from typing import List, Dict, Type |
5
|
|
|
|
6
|
|
|
import jsonschema |
7
|
|
|
from jsonschema import ValidationError |
8
|
|
|
|
9
|
|
|
from decision_engine.engine import Engine |
10
|
|
|
from decision_engine.rules import Rule |
11
|
|
|
from decision_engine.sources import Source |
12
|
|
|
|
13
|
|
|
|
14
|
|
|
param_types_table = { |
15
|
|
|
'boolean': bool, |
16
|
|
|
'float': float, |
17
|
|
|
'integer': int, |
18
|
|
|
'string': str, |
19
|
|
|
'source': str |
20
|
|
|
} |
21
|
|
|
|
22
|
|
|
|
23
|
|
|
def load_json_file(file: str) -> dict: |
24
|
|
|
with (open(file)) as fp: |
25
|
|
|
contents = json.load(fp) |
26
|
|
|
return contents |
27
|
|
|
|
28
|
|
|
|
29
|
|
|
def validate(definition: dict, schema: dict): |
30
|
|
|
jsonschema.validate(instance=definition, schema=schema) |
31
|
|
|
|
32
|
|
|
|
33
|
|
|
def _check_param_type(param: dict): |
34
|
|
|
param_type = param_types_table[param['type']] |
35
|
|
|
if not isinstance(param['value'], param_type): |
36
|
|
|
msg = f"Parameter declared with type {param['type']}" \ |
37
|
|
|
f"(Python type {param_type}) " \ |
38
|
|
|
f"but value is of type {type(param['value']).__name__}." |
39
|
|
|
raise ValidationError(msg) |
40
|
|
|
|
41
|
|
|
|
42
|
|
|
def _check_source_param_exists(param: dict, sources: dict): |
43
|
|
|
if param['value'] not in sources.keys(): |
44
|
|
|
msg = f'Parameter declared as source but specified source ' \ |
45
|
|
|
f'{param["value"]} has not been parsed yet. ' \ |
46
|
|
|
f'Please rectify your definition file.' |
47
|
|
|
raise ValidationError(msg) |
48
|
|
|
|
49
|
|
|
|
50
|
|
|
def _parse_params(params: dict, parsed_sources: List[Source]) -> list: |
51
|
|
|
result = [] |
52
|
|
|
sources_dict = {source.name: source for source in parsed_sources} |
53
|
|
|
for param in params: |
54
|
|
|
_check_param_type(param) |
55
|
|
|
|
56
|
|
|
if param['type'] == 'source': |
57
|
|
|
_check_source_param_exists(param, sources_dict) |
58
|
|
|
param['value'] = sources_dict[param['value']] |
59
|
|
|
|
60
|
|
|
result.append(param['value']) |
61
|
|
|
|
62
|
|
|
return result |
63
|
|
|
|
64
|
|
|
|
65
|
|
|
def parse_sources(sources: List[dict]) -> List[Source]: |
66
|
|
|
final_sources: List[Source] = [] |
67
|
|
|
module = importlib.import_module('decision_engine.sources') |
68
|
|
|
for source in sources: |
69
|
|
|
class_name = source['class'] |
70
|
|
|
class_ = getattr(module, class_name) |
71
|
|
|
params = _parse_params(source['params'], final_sources) |
72
|
|
|
instance = class_(*params, source['name']) |
73
|
|
|
final_sources.append(instance) |
74
|
|
|
|
75
|
|
|
return final_sources |
76
|
|
|
|
77
|
|
|
|
78
|
|
|
def parse_rules(rules: List[dict], sources: List[Source]) -> List[Rule]: |
79
|
|
|
final_rules = [] |
80
|
|
|
rules_module = importlib.import_module('decision_engine.rules') |
81
|
|
|
comparisons_module = importlib.import_module('decision_engine.comparisons') |
82
|
|
|
for rule in rules: |
83
|
|
|
rules_class = getattr(rules_module, rule['class']) |
84
|
|
|
comparison_class = getattr(comparisons_module, rule['comparison']) |
85
|
|
|
|
86
|
|
|
# Create a new list containing only the sources named in the rule. |
87
|
|
|
rule_sources = [ |
88
|
|
|
source for source in sources |
89
|
|
|
if source.name in rule['sources'] |
90
|
|
|
] |
91
|
|
|
|
92
|
|
|
instance = rules_class(*rule_sources, comparison_class(), rule['name']) |
93
|
|
|
final_rules.append(instance) |
94
|
|
|
|
95
|
|
|
return final_rules |
96
|
|
|
|
97
|
|
|
|
98
|
|
|
def parse_engines(engines: List[dict], rules: List[Rule]) -> List[Engine]: |
99
|
|
|
final_engines = [] |
100
|
|
|
for engine in engines: |
101
|
|
|
# Create a new list containing only the rules named in the engine. |
102
|
|
|
engine_rules = [rule for rule in rules if rule.name in engine['rules']] |
103
|
|
|
|
104
|
|
|
instance = Engine(engine_rules, engine['name']) |
105
|
|
|
final_engines.append(instance) |
106
|
|
|
|
107
|
|
|
return final_engines |
108
|
|
|
|
109
|
|
|
|
110
|
|
|
def parse_json(definition: dict) -> dict: |
111
|
|
|
schema_file = 'schema.json' |
112
|
|
|
schema_path = (Path(__file__).parents[0] / schema_file).absolute() |
113
|
|
|
schema = load_json_file(str(schema_path)) |
114
|
|
|
validate(definition, schema) |
115
|
|
|
|
116
|
|
|
sources = parse_sources(definition['sources']) |
117
|
|
|
rules = parse_rules(definition['rules'], sources) |
118
|
|
|
engines = parse_engines(definition['engines'], rules) |
119
|
|
|
|
120
|
|
|
return {'sources': sources, 'rules': rules, 'engines': engines} |
121
|
|
|
|
122
|
|
|
|
123
|
|
|
def parse_json_file(file: str) -> dict: |
124
|
|
|
definition = load_json_file(file) |
125
|
|
|
return parse_json(definition) |
126
|
|
|
|