1 | from __future__ import absolute_import |
||
2 | |||
3 | import math |
||
4 | |||
5 | from sqlalchemy import Text, func, or_ |
||
6 | from sqlalchemy.dialects import mysql, postgresql, sqlite |
||
7 | |||
8 | from datatables.clean_regex import clean_regex |
||
9 | from datatables.search_methods import SEARCH_METHODS |
||
10 | |||
11 | |||
12 | class DataTables: |
||
13 | """Class defining a DataTables object. |
||
14 | |||
15 | :param request: request containing the GET values, specified by the |
||
16 | datatable for filtering, sorting and paging |
||
17 | :type request: pyramid.request |
||
18 | :param query: the query wanted to be seen in the the table |
||
19 | :type query: sqlalchemy.orm.query.Query |
||
20 | :param columns: columns specification for the datatables |
||
21 | :type columns: list |
||
22 | |||
23 | :returns: a DataTables object |
||
24 | """ |
||
25 | |||
26 | def __init__(self, request, query, columns, allow_regex_searches=False): |
||
27 | """Initialize object and run the query.""" |
||
28 | self.params = dict(request) |
||
29 | if 'sEcho' in self.params: |
||
30 | raise ValueError( |
||
31 | 'Legacy datatables not supported, upgrade to >=1.10') |
||
32 | self.query = query |
||
33 | self.columns = columns |
||
34 | self.results = None |
||
35 | self.allow_regex_searches = allow_regex_searches |
||
36 | |||
37 | # total in the table after filtering |
||
38 | self.cardinality_filtered = 0 |
||
39 | |||
40 | # total in the table unfiltered |
||
41 | self.cardinality = 0 |
||
42 | |||
43 | self.yadcf_params = [] |
||
44 | self.filter_expressions = [] |
||
45 | self.error = None |
||
46 | try: |
||
47 | self.run() |
||
48 | except Exception as exc: |
||
0 ignored issues
–
show
|
|||
49 | self.error = str(exc) |
||
50 | |||
51 | def output_result(self): |
||
52 | """Output results in the format needed by DataTables.""" |
||
53 | output = {} |
||
54 | output['draw'] = str(int(self.params.get('draw', 1))) |
||
55 | output['recordsTotal'] = str(self.cardinality) |
||
56 | output['recordsFiltered'] = str(self.cardinality_filtered) |
||
57 | if self.error: |
||
58 | output['error'] = self.error |
||
59 | return output |
||
60 | |||
61 | output['data'] = self.results |
||
62 | for k, v in self.yadcf_params: |
||
63 | output[k] = v |
||
64 | return output |
||
65 | |||
66 | def _query_with_all_filters_except_one(self, query, exclude): |
||
67 | return query.filter(*[ |
||
68 | e for i, e in enumerate(self.filter_expressions) |
||
69 | if e is not None and i is not exclude |
||
70 | ]) |
||
71 | |||
72 | def _set_yadcf_data(self, query): |
||
73 | # determine values for yadcf filters |
||
74 | for i, col in enumerate(self.columns): |
||
75 | if col.search_method in 'yadcf_range_number_slider': |
||
76 | v = query.add_columns( |
||
77 | func.min(col.sqla_expr), func.max(col.sqla_expr)).one() |
||
78 | self.yadcf_params.append(('yadcf_data_{:d}'.format(i), |
||
79 | (math.floor(v[0]), math.ceil(v[1])))) |
||
80 | if col.search_method in [ |
||
81 | 'yadcf_select', 'yadcf_multi_select', 'yadcf_autocomplete' |
||
82 | ]: |
||
83 | filtered = self._query_with_all_filters_except_one( |
||
84 | query=query, exclude=i) |
||
85 | v = filtered.add_columns(col.sqla_expr).distinct().all() |
||
86 | self.yadcf_params.append(('yadcf_data_{:d}'.format(i), |
||
87 | [r[0] for r in v])) |
||
88 | |||
89 | def run(self): |
||
90 | """Launch filtering, sorting and paging to output results.""" |
||
91 | query = self.query |
||
92 | |||
93 | # count before filtering |
||
94 | self.cardinality = query.add_columns(self.columns[0].sqla_expr).count() |
||
95 | |||
96 | self._set_column_filter_expressions() |
||
97 | self._set_global_filter_expression() |
||
98 | self._set_sort_expressions() |
||
99 | self._set_yadcf_data(query) |
||
100 | |||
101 | # apply filters |
||
102 | query = query.filter( |
||
103 | *[e for e in self.filter_expressions if e is not None]) |
||
104 | |||
105 | self.cardinality_filtered = query.add_columns( |
||
106 | self.columns[0].sqla_expr).count() |
||
107 | |||
108 | # apply sorts |
||
109 | query = query.order_by( |
||
110 | *[e for e in self.sort_expressions if e is not None]) |
||
111 | |||
112 | # add paging options |
||
113 | length = int(self.params.get('length')) |
||
114 | if length >= 0: |
||
115 | query = query.limit(length) |
||
116 | elif length == -1: |
||
117 | pass |
||
118 | else: |
||
119 | raise (ValueError( |
||
120 | 'Length should be a positive integer or -1 to disable')) |
||
121 | query = query.offset(int(self.params.get('start'))) |
||
122 | |||
123 | # add columns to query |
||
124 | query = query.add_columns(*[c.sqla_expr for c in self.columns]) |
||
125 | |||
126 | # fetch the result of the queries |
||
127 | column_names = [ |
||
128 | col.mData if col.mData else str(i) |
||
129 | for i, col in enumerate(self.columns) |
||
130 | ] |
||
131 | self.results = [{k: v |
||
132 | for k, v in zip(column_names, row)} |
||
133 | for row in query.all()] |
||
134 | |||
135 | def _set_column_filter_expressions(self): |
||
136 | """Construct the query: filtering. |
||
137 | |||
138 | Add filtering when per column searching is used. |
||
139 | """ |
||
140 | # per columns filters: |
||
141 | for i in range(len(self.columns)): |
||
142 | filter_expr = None |
||
143 | value = self.params.get('columns[{:d}][search][value]'.format(i), |
||
144 | '') |
||
145 | if value: |
||
146 | search_func = SEARCH_METHODS[self.columns[i].search_method] |
||
147 | filter_expr = search_func(self.columns[i].sqla_expr, value) |
||
148 | self.filter_expressions.append(filter_expr) |
||
149 | |||
150 | def _set_global_filter_expression(self): |
||
151 | # global search filter |
||
152 | global_search = self.params.get('search[value]', '') |
||
153 | if global_search is '': |
||
154 | return |
||
155 | |||
156 | if (self.allow_regex_searches |
||
157 | and self.params.get('search[regex]') == 'true'): |
||
158 | op = self._get_regex_operator() |
||
159 | val = clean_regex(global_search) |
||
160 | |||
161 | def filter_for(col): |
||
162 | return col.sqla_expr.op(op)(val) |
||
163 | else: |
||
164 | val = '%' + global_search + '%' |
||
165 | |||
166 | def filter_for(col): |
||
167 | return col.sqla_expr.cast(Text).ilike(val) |
||
168 | |||
169 | global_filter = [ |
||
170 | filter_for(col) for col in self.columns if col.global_search |
||
171 | ] |
||
172 | |||
173 | self.filter_expressions.append(or_(*global_filter)) |
||
174 | |||
175 | def _set_sort_expressions(self): |
||
176 | """Construct the query: sorting. |
||
177 | |||
178 | Add sorting(ORDER BY) on the columns needed to be applied on. |
||
179 | """ |
||
180 | sort_expressions = [] |
||
181 | i = 0 |
||
182 | while self.params.get('order[{:d}][column]'.format(i), False): |
||
183 | column_nr = int(self.params.get('order[{:d}][column]'.format(i))) |
||
184 | column = self.columns[column_nr] |
||
185 | direction = self.params.get('order[{:d}][dir]'.format(i)) |
||
186 | sort_expr = column.sqla_expr |
||
187 | if direction == 'asc': |
||
188 | sort_expr = sort_expr.asc() |
||
189 | elif direction == 'desc': |
||
190 | sort_expr = sort_expr.desc() |
||
191 | else: |
||
192 | raise ValueError( |
||
193 | 'Invalid order direction: {}'.format(direction)) |
||
194 | if column.nulls_order: |
||
195 | if column.nulls_order == 'nullsfirst': |
||
196 | sort_expr = sort_expr.nullsfirst() |
||
197 | elif column.nulls_order == 'nullslast': |
||
198 | sort_expr = sort_expr.nullslast() |
||
199 | else: |
||
200 | raise ValueError( |
||
201 | 'Invalid order direction: {}'.format(direction)) |
||
202 | |||
203 | sort_expressions.append(sort_expr) |
||
204 | i += 1 |
||
205 | self.sort_expressions = sort_expressions |
||
206 | |||
207 | def _get_regex_operator(self): |
||
208 | if isinstance(self.query.session.bind.dialect, postgresql.dialect): |
||
209 | return '~' |
||
210 | elif isinstance(self.query.session.bind.dialect, mysql.dialect): |
||
211 | return 'REGEXP' |
||
212 | elif isinstance(self.query.session.bind.dialect, sqlite.dialect): |
||
213 | return 'REGEXP' |
||
214 | else: |
||
215 | raise NotImplementedError( |
||
216 | 'Regex searches are not implemented for this dialect') |
||
217 |
Generally, you would want to handle very specific errors in the exception handler. This ensure that you do not hide other types of errors which should be fixed.
So, unless you specifically plan to handle any error, consider adding a more specific exception.