SearchEngine.query()   C
last analyzed

Complexity

Conditions 9

Size

Total Lines 29

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 9
c 0
b 0
f 0
dl 0
loc 29
rs 6.6666
1
# -*- coding: utf-8 -*-
2
import json
3
import logging
4
import requests
5
from zope.interface import Interface, implementer
6
from oe_utils.data.data_transfer_objects import ResultDTO
7
8
log = logging.getLogger(__name__)
9
10
11
def load_searchquery_parameters(query_params, settings, user_acls=None):
12
    """
13
    Creates a query for the searchengine based on the provided params.
14
15
    :param query_params: the request params for the search action
16
    :param settings: settings dictionary
17
    :param user_acls: an acces list for role based filtering
18
    :returns: Een :class:`dict` query object for the searchengine
19
    """
20
    q = {
21
        'match_all': {}
22
    }
23
    return q
24
25
26
def default_mapper(result, settings):
27
    if "hits" in result:
28
        result = [r['_source'] for r in result["hits"]["hits"]]
29
        return result
30
    else:
31
        return []
32
33
34
class ISearchEngine(Interface):
35
    def add_to_index(system_token, object_type, object_id, object_data):
36
        """add an object to the index with a specific type"""
37
38
    def remove_from_index(system_token, object_type, object_id):
39
        """remove an object from the index"""
40
41
    def remove_from_index_by_query(system_token, object_field, object_value):
42
        """remove an object from the index by query"""
43
44
    def query(system_token, object_type=None, query=None, sort='', options=None):
45
        """execute a query on the search engine"""
46
47
    def remove_index(system_token):
48
        """remove the index"""
49
50
    def create_index(system_token, data=None):
51
        """create the index"""
52
53
    def add_type_mapping(object_type, object_type_mapping, system_token):
54
        """add the mapping for specific type"""
55
56
57
@implementer(ISearchEngine)
58
class SearchEngine(object):
59
    def __init__(self, baseurl, index_name, version='1'):
60
        self.baseurl = baseurl
61
        self.index_name = index_name
62
        self.version = version
63
        self.content_header = {"Content-Type": "application/json"} if self.version == '6' else {}
64
65
    def add_to_index(self, system_token, object_type, object_id, object_data):
66
        """add an object to the index with a specific type"""
67
        headers = {'OpenAmSSOID': system_token} if system_token else {}
68
        headers.update(self.content_header)
69
        res = requests.put(self.baseurl + '/' + self.index_name + '/' + object_type + '/' + str(object_id),
70
                           object_data, headers=headers)
71
        res.raise_for_status()
72
73
    def remove_from_index(self, system_token, object_type, object_id):
74
        """remove an object from the index"""
75
        headers = {'OpenAmSSOID': system_token} if system_token else {}
76
        res = requests.delete(self.baseurl + '/' + self.index_name + '/' + object_type + '/' + str(object_id),
77
                              headers=headers)
78
        res.raise_for_status()
79
80
    def remove_from_index_by_query(self, system_token, object_field, object_value):
81
        """remove an object from the index by query"""
82
        headers = {'OpenAmSSOID': system_token} if system_token else {}
83
        if self.version == '1':
84
            res = requests.delete(self.baseurl + '/' + self.index_name +
85
                                  '/_query?q=' + object_field + ':' + str(object_value),
86
                                  headers=headers)
87
        else:
88
            headers.update(self.content_header)
89
            delete_query = {"query": {"match": {object_field: object_value}}}
90
            res = requests.post(self.baseurl + '/' + self.index_name + '/_delete_by_query',
91
                                data=json.dumps(delete_query),
92
                                headers=headers)
93
        res.raise_for_status()
94
95
    def query(self, system_token, object_type=None, query_params=None, sort=None, result_range=None,
96
              mapper=default_mapper, load_searchquery_param_func=load_searchquery_parameters,
97
              aggregations=None, settings=None, user_acls=None, min_score=None):
98
        """execute a query on the search engine"""
99
        query = load_searchquery_param_func(query_params, settings, user_acls=user_acls)
100
        if not sort:
101
            sort = ['_score']
102
        params = {}
103
        if result_range:
104
            params['size'] = result_range.get_page_size()
105
            params['from'] = result_range.start
106
        data = {
107
            "query": query,
108
            "sort": sort,
109
        }
110
        if min_score:
111
            data["min_score"] = min_score
112
        if aggregations:
113
            data["aggregations"] = aggregations
114
        headers = {'OpenAmSSOID': system_token} if system_token else {}
115
        headers.update(self.content_header)
116
        search_url = self.baseurl + "/" + self.index_name
117
        # if no object_type assume full index search
118
        search_url += '/' + object_type + '/_search' if object_type else '/_search'
119
        res = requests.post(search_url, data=json.dumps(data), params=params, headers=headers)
120
        res.raise_for_status()
121
        result = json.loads(res.text)
122
        return ResultDTO(mapper(result, settings), result["hits"]["total"] if "hits" in result else 0,
123
                         result["aggregations"] if "aggregations" in result else None)
124
125
    def remove_index(self, system_token):
126
        headers = {'OpenAmSSOID': system_token} if system_token else {}
127
        res = requests.head(self.baseurl + "/" + self.index_name, headers=headers)
128
        if res.status_code < 400:  # otherwise assume index doens't exists
129
            res = requests.delete(self.baseurl + "/" + self.index_name, headers=headers)
130
            res.raise_for_status()
131
132
    def create_index(self, system_token, data):
133
        headers = {'OpenAmSSOID': system_token} if system_token else {}
134
        headers.update(self.content_header)
135
        res = requests.put(self.baseurl + "/" + self.index_name, data=json.dumps(data), headers=headers)
136
        res.raise_for_status()
137
138
    def add_type_mapping(self, object_type, object_type_mapping, system_token):
139
        headers = {'OpenAmSSOID': system_token} if system_token else {}
140
        headers.update(self.content_header)
141
        res = requests.put(self.baseurl + "/" + self.index_name + "/_mapping/" + object_type,
142
                           data=json.dumps(object_type_mapping), headers=headers)
143
        res.raise_for_status()
144