|
1
|
|
|
import importlib |
|
2
|
|
|
import json |
|
3
|
|
|
from pathlib import Path |
|
4
|
|
|
from typing import List |
|
5
|
|
|
|
|
6
|
|
|
import jsonschema |
|
7
|
|
|
from jsonschema import ValidationError |
|
8
|
|
|
|
|
9
|
|
|
from decision_engine.engine import Engine |
|
10
|
|
|
from decision_engine.rules import Rule |
|
11
|
|
|
from decision_engine.sources import Source |
|
12
|
|
|
|
|
13
|
|
|
param_types_table = { |
|
14
|
|
|
'boolean': bool, |
|
15
|
|
|
'float': float, |
|
16
|
|
|
'integer': int, |
|
17
|
|
|
'string': str, |
|
18
|
|
|
'source': str |
|
19
|
|
|
} |
|
20
|
|
|
|
|
21
|
|
|
|
|
22
|
|
|
def load_json_file(file: str) -> dict: |
|
23
|
|
|
with (open(file)) as fp: |
|
24
|
|
|
contents = json.load(fp) |
|
25
|
|
|
return contents |
|
26
|
|
|
|
|
27
|
|
|
|
|
28
|
|
|
def validate(definition: dict, schema: dict): |
|
29
|
|
|
jsonschema.validate(instance=definition, schema=schema) |
|
30
|
|
|
|
|
31
|
|
|
|
|
32
|
|
|
def _check_param_type(param: dict): |
|
33
|
|
|
param_type = param_types_table[param['type']] |
|
34
|
|
|
if not isinstance(param['value'], param_type): |
|
35
|
|
|
msg = f"Parameter declared with type {param['type']}" \ |
|
36
|
|
|
f"(Python type {param_type}) " \ |
|
37
|
|
|
f"but value is of type {type(param['value']).__name__}." |
|
38
|
|
|
raise ValidationError(msg) |
|
39
|
|
|
|
|
40
|
|
|
|
|
41
|
|
|
def _check_source_param_exists(param: dict, sources: dict): |
|
42
|
|
|
if param['value'] not in sources.keys(): |
|
43
|
|
|
msg = f'Parameter declared as source but specified source ' \ |
|
44
|
|
|
f'{param["value"]} has not been parsed yet. ' \ |
|
45
|
|
|
f'Please rectify your definition file.' |
|
46
|
|
|
raise ValidationError(msg) |
|
47
|
|
|
|
|
48
|
|
|
|
|
49
|
|
|
def _parse_source_params(params: dict, parsed_sources: List[Source]) -> list: |
|
50
|
|
|
result = [] |
|
51
|
|
|
sources_dict = {source.name: source for source in parsed_sources} |
|
52
|
|
|
for param in params: |
|
53
|
|
|
_check_param_type(param) |
|
54
|
|
|
|
|
55
|
|
|
if param['type'] == 'source': |
|
56
|
|
|
_check_source_param_exists(param, sources_dict) |
|
57
|
|
|
param['value'] = sources_dict[param['value']] |
|
58
|
|
|
|
|
59
|
|
|
result.append(param['value']) |
|
60
|
|
|
|
|
61
|
|
|
return result |
|
62
|
|
|
|
|
63
|
|
|
|
|
64
|
|
|
def parse_sources(sources: List[dict]) -> List[Source]: |
|
65
|
|
|
final_sources: List[Source] = [] |
|
66
|
|
|
module = importlib.import_module('decision_engine.sources') |
|
67
|
|
|
for source in sources: |
|
68
|
|
|
class_name = source['class'] |
|
69
|
|
|
class_ = getattr(module, class_name) |
|
70
|
|
|
params = _parse_source_params(source['params'], final_sources) |
|
71
|
|
|
instance = class_(*params, source['name']) |
|
72
|
|
|
final_sources.append(instance) |
|
73
|
|
|
|
|
74
|
|
|
return final_sources |
|
75
|
|
|
|
|
76
|
|
|
|
|
77
|
|
|
def _check_rule_param_exists(param: dict, rules: dict): |
|
78
|
|
|
if param['value'] not in rules.keys(): |
|
79
|
|
|
msg = f'Parameter declared as rule but specified rule ' \ |
|
80
|
|
|
f'{param["value"]} has not been parsed yet. ' \ |
|
81
|
|
|
f'Please rectify your definition file.' |
|
82
|
|
|
raise ValidationError(msg) |
|
83
|
|
|
|
|
84
|
|
|
|
|
85
|
|
|
def _parse_rule_params(params: dict, parsed_rules: List[Rule], |
|
86
|
|
|
sources: List[Source]) -> list: |
|
87
|
|
|
result = [] |
|
88
|
|
|
rules_dict = {rule.name: rule for rule in parsed_rules} |
|
89
|
|
|
sources_dict = {source.name: source for source in sources} |
|
90
|
|
|
for param in params: |
|
91
|
|
|
if param['type'] == 'source': |
|
92
|
|
|
_check_source_param_exists(param, sources_dict) |
|
93
|
|
|
param['value'] = sources_dict[param['value']] |
|
94
|
|
|
elif param['type'] == 'rule': |
|
95
|
|
|
_check_rule_param_exists(param, rules_dict) |
|
96
|
|
|
param['value'] = rules_dict[param['value']] |
|
97
|
|
|
|
|
98
|
|
|
result.append(param['value']) |
|
99
|
|
|
|
|
100
|
|
|
return result |
|
101
|
|
|
|
|
102
|
|
|
|
|
103
|
|
|
def parse_rules(rules: List[dict], sources: List[Source]) -> List[Rule]: |
|
104
|
|
|
final_rules: List[Rule] = [] |
|
105
|
|
|
rules_module = importlib.import_module('decision_engine.rules') |
|
106
|
|
|
comparisons_module = importlib.import_module('decision_engine.comparisons') |
|
107
|
|
|
for rule in rules: |
|
108
|
|
|
rules_class = getattr(rules_module, rule['class']) |
|
109
|
|
|
comparison_class = getattr(comparisons_module, rule['comparison']) |
|
110
|
|
|
|
|
111
|
|
|
params = _parse_rule_params(rule['params'], final_rules, sources) |
|
112
|
|
|
|
|
113
|
|
|
# Stupid little hack to allow using a starred expression below. |
|
114
|
|
|
# Boolean rules expect a list of Rules, not Sources, so for those |
|
115
|
|
|
# cases, we first nest the parameters in another list, so that we can |
|
116
|
|
|
# still use *params but still end up with a list. |
|
117
|
|
|
if 'Boolean' in rule['class']: |
|
118
|
|
|
params = [params] |
|
119
|
|
|
else: |
|
120
|
|
|
params.append(comparison_class()) |
|
121
|
|
|
|
|
122
|
|
|
instance = rules_class(*params, rule['name']) |
|
123
|
|
|
final_rules.append(instance) |
|
124
|
|
|
|
|
125
|
|
|
return final_rules |
|
126
|
|
|
|
|
127
|
|
|
|
|
128
|
|
|
def parse_engines(engines: List[dict], rules: List[Rule]) -> List[Engine]: |
|
129
|
|
|
final_engines = [] |
|
130
|
|
|
for engine in engines: |
|
131
|
|
|
# Create a new list containing only the rules named in the engine. |
|
132
|
|
|
engine_rules = [rule for rule in rules if rule.name in engine['rules']] |
|
133
|
|
|
|
|
134
|
|
|
instance = Engine(engine_rules, engine['name']) |
|
135
|
|
|
final_engines.append(instance) |
|
136
|
|
|
|
|
137
|
|
|
return final_engines |
|
138
|
|
|
|
|
139
|
|
|
|
|
140
|
|
|
def parse_json(definition: dict) -> dict: |
|
141
|
|
|
schema_file = 'schema.json' |
|
142
|
|
|
schema_path = (Path(__file__).parents[0] / schema_file).absolute() |
|
143
|
|
|
schema = load_json_file(str(schema_path)) |
|
144
|
|
|
validate(definition, schema) |
|
145
|
|
|
|
|
146
|
|
|
sources = parse_sources(definition['sources']) |
|
147
|
|
|
rules = parse_rules(definition['rules'], sources) |
|
148
|
|
|
engines = parse_engines(definition['engines'], rules) |
|
149
|
|
|
|
|
150
|
|
|
return {'sources': sources, 'rules': rules, 'engines': engines} |
|
151
|
|
|
|
|
152
|
|
|
|
|
153
|
|
|
def parse_json_file(file: str) -> dict: |
|
154
|
|
|
definition = load_json_file(file) |
|
155
|
|
|
return parse_json(definition) |
|
156
|
|
|
|