1
|
|
|
import importlib |
2
|
|
|
import json |
3
|
|
|
from pathlib import Path |
4
|
|
|
from typing import List |
5
|
|
|
|
6
|
|
|
import jsonschema |
7
|
|
|
from jsonschema import ValidationError |
8
|
|
|
|
9
|
|
|
from decision_engine.engine import Engine |
10
|
|
|
from decision_engine.rules import Rule |
11
|
|
|
from decision_engine.sources import Source |
12
|
|
|
|
13
|
|
|
param_types_table = { |
14
|
|
|
'boolean': bool, |
15
|
|
|
'float': float, |
16
|
|
|
'integer': int, |
17
|
|
|
'string': str, |
18
|
|
|
'source': str |
19
|
|
|
} |
20
|
|
|
|
21
|
|
|
|
22
|
|
|
def load_json_file(file: str) -> dict: |
23
|
|
|
with (open(file)) as fp: |
24
|
|
|
contents = json.load(fp) |
25
|
|
|
return contents |
26
|
|
|
|
27
|
|
|
|
28
|
|
|
def validate(definition: dict, schema: dict): |
29
|
|
|
jsonschema.validate(instance=definition, schema=schema) |
30
|
|
|
|
31
|
|
|
|
32
|
|
|
def _check_param_type(param: dict): |
33
|
|
|
param_type = param_types_table[param['type']] |
34
|
|
|
if not isinstance(param['value'], param_type): |
35
|
|
|
msg = f"Parameter declared with type {param['type']}" \ |
36
|
|
|
f"(Python type {param_type}) " \ |
37
|
|
|
f"but value is of type {type(param['value']).__name__}." |
38
|
|
|
raise ValidationError(msg) |
39
|
|
|
|
40
|
|
|
|
41
|
|
|
def _check_source_param_exists(param: dict, sources: dict): |
42
|
|
|
if param['value'] not in sources.keys(): |
43
|
|
|
msg = f'Parameter declared as source but specified source ' \ |
44
|
|
|
f'{param["value"]} has not been parsed yet. ' \ |
45
|
|
|
f'Please rectify your definition file.' |
46
|
|
|
raise ValidationError(msg) |
47
|
|
|
|
48
|
|
|
|
49
|
|
|
def _parse_source_params(params: dict, parsed_sources: List[Source]) -> list: |
50
|
|
|
result = [] |
51
|
|
|
sources_dict = {source.name: source for source in parsed_sources} |
52
|
|
|
for param in params: |
53
|
|
|
_check_param_type(param) |
54
|
|
|
|
55
|
|
|
if param['type'] == 'source': |
56
|
|
|
_check_source_param_exists(param, sources_dict) |
57
|
|
|
param['value'] = sources_dict[param['value']] |
58
|
|
|
|
59
|
|
|
result.append(param['value']) |
60
|
|
|
|
61
|
|
|
return result |
62
|
|
|
|
63
|
|
|
|
64
|
|
|
def parse_sources(sources: List[dict]) -> List[Source]: |
65
|
|
|
final_sources: List[Source] = [] |
66
|
|
|
module = importlib.import_module('decision_engine.sources') |
67
|
|
|
for source in sources: |
68
|
|
|
class_name = source['class'] |
69
|
|
|
class_ = getattr(module, class_name) |
70
|
|
|
params = _parse_source_params(source['params'], final_sources) |
71
|
|
|
instance = class_(*params, source['name']) |
72
|
|
|
final_sources.append(instance) |
73
|
|
|
|
74
|
|
|
return final_sources |
75
|
|
|
|
76
|
|
|
|
77
|
|
|
def _check_rule_param_exists(param: dict, rules: dict): |
78
|
|
|
if param['value'] not in rules.keys(): |
79
|
|
|
msg = f'Parameter declared as rule but specified rule ' \ |
80
|
|
|
f'{param["value"]} has not been parsed yet. ' \ |
81
|
|
|
f'Please rectify your definition file.' |
82
|
|
|
raise ValidationError(msg) |
83
|
|
|
|
84
|
|
|
|
85
|
|
|
def _parse_rule_params(params: dict, parsed_rules: List[Rule], |
86
|
|
|
sources: List[Source]) -> list: |
87
|
|
|
result = [] |
88
|
|
|
rules_dict = {rule.name: rule for rule in parsed_rules} |
89
|
|
|
sources_dict = {source.name: source for source in sources} |
90
|
|
|
for param in params: |
91
|
|
|
if param['type'] == 'source': |
92
|
|
|
_check_source_param_exists(param, sources_dict) |
93
|
|
|
param['value'] = sources_dict[param['value']] |
94
|
|
|
elif param['type'] == 'rule': |
95
|
|
|
_check_rule_param_exists(param, rules_dict) |
96
|
|
|
param['value'] = rules_dict[param['value']] |
97
|
|
|
|
98
|
|
|
result.append(param['value']) |
99
|
|
|
|
100
|
|
|
return result |
101
|
|
|
|
102
|
|
|
|
103
|
|
|
def parse_rules(rules: List[dict], sources: List[Source]) -> List[Rule]: |
104
|
|
|
final_rules: List[Rule] = [] |
105
|
|
|
rules_module = importlib.import_module('decision_engine.rules') |
106
|
|
|
comparisons_module = importlib.import_module('decision_engine.comparisons') |
107
|
|
|
for rule in rules: |
108
|
|
|
rules_class = getattr(rules_module, rule['class']) |
109
|
|
|
comparison_class = getattr(comparisons_module, rule['comparison']) |
110
|
|
|
|
111
|
|
|
params = _parse_rule_params(rule['params'], final_rules, sources) |
112
|
|
|
|
113
|
|
|
# Stupid little hack to allow using a starred expression below. |
114
|
|
|
# Boolean rules expect a list of Rules, not Sources, so for those |
115
|
|
|
# cases, we first nest the parameters in another list, so that we can |
116
|
|
|
# still use *params but still end up with a list. |
117
|
|
|
if 'Boolean' in rule['class']: |
118
|
|
|
params = [params] |
119
|
|
|
else: |
120
|
|
|
params.append(comparison_class()) |
121
|
|
|
|
122
|
|
|
instance = rules_class(*params, rule['name']) |
123
|
|
|
final_rules.append(instance) |
124
|
|
|
|
125
|
|
|
return final_rules |
126
|
|
|
|
127
|
|
|
|
128
|
|
|
def parse_engines(engines: List[dict], rules: List[Rule]) -> List[Engine]: |
129
|
|
|
final_engines = [] |
130
|
|
|
for engine in engines: |
131
|
|
|
# Create a new list containing only the rules named in the engine. |
132
|
|
|
engine_rules = [rule for rule in rules if rule.name in engine['rules']] |
133
|
|
|
|
134
|
|
|
instance = Engine(engine_rules, engine['name']) |
135
|
|
|
final_engines.append(instance) |
136
|
|
|
|
137
|
|
|
return final_engines |
138
|
|
|
|
139
|
|
|
|
140
|
|
|
def parse_json(definition: dict) -> dict: |
141
|
|
|
schema_file = 'schema.json' |
142
|
|
|
schema_path = (Path(__file__).parents[0] / schema_file).absolute() |
143
|
|
|
schema = load_json_file(str(schema_path)) |
144
|
|
|
validate(definition, schema) |
145
|
|
|
|
146
|
|
|
sources = parse_sources(definition['sources']) |
147
|
|
|
rules = parse_rules(definition['rules'], sources) |
148
|
|
|
engines = parse_engines(definition['engines'], rules) |
149
|
|
|
|
150
|
|
|
return {'sources': sources, 'rules': rules, 'engines': engines} |
151
|
|
|
|
152
|
|
|
|
153
|
|
|
def parse_json_file(file: str) -> dict: |
154
|
|
|
definition = load_json_file(file) |
155
|
|
|
return parse_json(definition) |
156
|
|
|
|