1 | from __future__ import absolute_import |
||
0 ignored issues
–
show
|
|||
2 | |||
3 | import math |
||
4 | |||
5 | from sqlalchemy import Text, func, or_ |
||
6 | from sqlalchemy.dialects import mysql, postgresql, sqlite |
||
7 | |||
8 | from datatables.clean_regex import clean_regex |
||
9 | from datatables.search_methods import SEARCH_METHODS |
||
10 | |||
11 | |||
12 | class DataTables: |
||
13 | """Class defining a DataTables object. |
||
14 | |||
15 | :param request: request containing the GET values, specified by the |
||
16 | datatable for filtering, sorting and paging |
||
17 | :type request: pyramid.request |
||
18 | :param query: the query wanted to be seen in the the table |
||
19 | :type query: sqlalchemy.orm.query.Query |
||
20 | :param columns: columns specification for the datatables |
||
21 | :type columns: list |
||
22 | |||
23 | :returns: a DataTables object |
||
24 | """ |
||
25 | |||
26 | def __init__(self, request, query, columns, allow_regex_searches=False): |
||
27 | """Initialize object and run the query.""" |
||
28 | self.params = dict(request) |
||
29 | if 'sEcho' in self.params: |
||
30 | raise ValueError( |
||
31 | 'Legacy datatables not supported, upgrade to >=1.10') |
||
32 | self.query = query |
||
33 | self.columns = columns |
||
34 | self.results = None |
||
35 | self.allow_regex_searches = allow_regex_searches |
||
36 | |||
37 | # total in the table after filtering |
||
38 | self.cardinality_filtered = 0 |
||
39 | |||
40 | # total in the table unfiltered |
||
41 | self.cardinality = 0 |
||
42 | |||
43 | self.yadcf_params = [] |
||
44 | self.filter_expressions = [] |
||
45 | self.error = None |
||
46 | try: |
||
47 | self.run() |
||
48 | except Exception as exc: |
||
49 | self.error = str(exc) |
||
50 | |||
51 | def output_result(self): |
||
52 | """Output results in the format needed by DataTables.""" |
||
53 | output = {} |
||
54 | output['draw'] = str(int(self.params.get('draw', 1))) |
||
55 | output['recordsTotal'] = str(self.cardinality) |
||
56 | output['recordsFiltered'] = str(self.cardinality_filtered) |
||
57 | if self.error: |
||
58 | output['error'] = self.error |
||
59 | return output |
||
60 | |||
61 | output['data'] = self.results |
||
62 | for k, v in self.yadcf_params: |
||
0 ignored issues
–
show
The name
v does not conform to the variable naming conventions ((([a-z][a-z0-9_]{2,30})|(_[a-z0-9_]*))$ ).
This check looks for invalid names for a range of different identifiers. You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements. If your project includes a Pylint configuration file, the settings contained in that file take precedence. To find out more about Pylint, please refer to their site.
Loading history...
|
|||
63 | output[k] = v |
||
64 | return output |
||
65 | |||
66 | def _query_with_all_filters_except_one(self, query, exclude): |
||
67 | return query.filter(*[ |
||
68 | e for i, e in enumerate(self.filter_expressions) |
||
69 | if e is not None and i is not exclude |
||
70 | ]) |
||
71 | |||
72 | def _set_yadcf_data(self, query): |
||
73 | # determine values for yadcf filters |
||
74 | for i, col in enumerate(self.columns): |
||
75 | if col.search_method in 'yadcf_range_number_slider': |
||
76 | v = query.add_columns( |
||
0 ignored issues
–
show
The name
v does not conform to the variable naming conventions ((([a-z][a-z0-9_]{2,30})|(_[a-z0-9_]*))$ ).
This check looks for invalid names for a range of different identifiers. You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements. If your project includes a Pylint configuration file, the settings contained in that file take precedence. To find out more about Pylint, please refer to their site.
Loading history...
|
|||
77 | func.min(col.sqla_expr), func.max(col.sqla_expr)).one() |
||
78 | self.yadcf_params.append(('yadcf_data_{:d}'.format(i), |
||
79 | (math.floor(v[0]), math.ceil(v[1])))) |
||
80 | if col.search_method in [ |
||
81 | 'yadcf_select', 'yadcf_multi_select', 'yadcf_autocomplete' |
||
82 | ]: |
||
83 | filtered = self._query_with_all_filters_except_one( |
||
84 | query=query, exclude=i) |
||
85 | v = filtered.add_columns(col.sqla_expr).distinct().all() |
||
0 ignored issues
–
show
The name
v does not conform to the variable naming conventions ((([a-z][a-z0-9_]{2,30})|(_[a-z0-9_]*))$ ).
This check looks for invalid names for a range of different identifiers. You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements. If your project includes a Pylint configuration file, the settings contained in that file take precedence. To find out more about Pylint, please refer to their site.
Loading history...
|
|||
86 | self.yadcf_params.append(('yadcf_data_{:d}'.format(i), |
||
87 | [r[0] for r in v])) |
||
88 | |||
89 | def run(self): |
||
90 | """Launch filtering, sorting and paging to output results.""" |
||
91 | query = self.query |
||
92 | |||
93 | # count before filtering |
||
94 | self.cardinality = query.add_columns(self.columns[0].sqla_expr).count() |
||
95 | |||
96 | self._set_column_filter_expressions() |
||
97 | self._set_global_filter_expression() |
||
98 | self._set_sort_expressions() |
||
99 | self._set_yadcf_data(query) |
||
100 | |||
101 | # apply filters |
||
102 | query = query.filter( |
||
103 | *[e for e in self.filter_expressions if e is not None]) |
||
104 | |||
105 | self.cardinality_filtered = query.add_columns( |
||
106 | self.columns[0].sqla_expr).count() |
||
107 | |||
108 | # apply sorts |
||
109 | query = query.order_by( |
||
110 | *[e for e in self.sort_expressions if e is not None]) |
||
111 | |||
112 | # add paging options |
||
113 | length = int(self.params.get('length')) |
||
114 | if length >= 0: |
||
115 | query = query.limit(length) |
||
116 | elif length == -1: |
||
117 | pass |
||
118 | else: |
||
119 | raise (ValueError( |
||
120 | 'Length should be a positive integer or -1 to disable')) |
||
121 | query = query.offset(int(self.params.get('start'))) |
||
122 | |||
123 | # add columns to query |
||
124 | query = query.add_columns(*[c.sqla_expr for c in self.columns]) |
||
125 | |||
126 | # fetch the result of the queries |
||
127 | column_names = [ |
||
128 | col.mData if col.mData else str(i) |
||
129 | for i, col in enumerate(self.columns) |
||
130 | ] |
||
131 | self.results = [{k: v |
||
132 | for k, v in zip(column_names, row)} |
||
133 | for row in query.all()] |
||
134 | |||
135 | def _set_column_filter_expressions(self): |
||
136 | """Construct the query: filtering. |
||
137 | |||
138 | Add filtering when per column searching is used. |
||
139 | """ |
||
140 | # per columns filters: |
||
141 | for i in range(len(self.columns)): |
||
142 | filter_expr = None |
||
143 | value = self.params.get('columns[{:d}][search][value]'.format(i), |
||
144 | '') |
||
145 | if value: |
||
146 | search_func = SEARCH_METHODS[self.columns[i].search_method] |
||
147 | filter_expr = search_func(self.columns[i].sqla_expr, value) |
||
148 | self.filter_expressions.append(filter_expr) |
||
149 | |||
150 | def _set_global_filter_expression(self): |
||
151 | # global search filter |
||
152 | global_search = self.params.get('search[value]', '') |
||
153 | if global_search is '': |
||
154 | return |
||
155 | |||
156 | if (self.allow_regex_searches |
||
157 | and self.params.get('search[regex]') == 'true'): |
||
158 | op = self._get_regex_operator() |
||
0 ignored issues
–
show
The name
op does not conform to the variable naming conventions ((([a-z][a-z0-9_]{2,30})|(_[a-z0-9_]*))$ ).
This check looks for invalid names for a range of different identifiers. You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements. If your project includes a Pylint configuration file, the settings contained in that file take precedence. To find out more about Pylint, please refer to their site.
Loading history...
|
|||
159 | val = clean_regex(global_search) |
||
160 | |||
161 | def filter_for(col): |
||
0 ignored issues
–
show
This function should have a docstring.
The coding style of this project requires that you add a docstring to this code element. Below, you find an example for methods: class SomeClass:
def some_method(self):
"""Do x and return foo."""
If you would like to know more about docstrings, we recommend to read PEP-257: Docstring Conventions.
Loading history...
|
|||
162 | return col.sqla_expr.op(op)(val) |
||
163 | else: |
||
164 | val = '%' + global_search + '%' |
||
165 | |||
166 | def filter_for(col): |
||
0 ignored issues
–
show
This function should have a docstring.
The coding style of this project requires that you add a docstring to this code element. Below, you find an example for methods: class SomeClass:
def some_method(self):
"""Do x and return foo."""
If you would like to know more about docstrings, we recommend to read PEP-257: Docstring Conventions.
Loading history...
|
|||
167 | return col.sqla_expr.cast(Text).ilike(val) |
||
168 | |||
169 | global_filter = [ |
||
170 | filter_for(col) for col in self.columns if col.global_search |
||
171 | ] |
||
172 | |||
173 | self.filter_expressions.append(or_(*global_filter)) |
||
174 | |||
175 | def _set_sort_expressions(self): |
||
176 | """Construct the query: sorting. |
||
177 | |||
178 | Add sorting(ORDER BY) on the columns needed to be applied on. |
||
179 | """ |
||
180 | sort_expressions = [] |
||
181 | i = 0 |
||
182 | while self.params.get('order[{:d}][column]'.format(i), False): |
||
183 | column_nr = int(self.params.get('order[{:d}][column]'.format(i))) |
||
184 | column = self.columns[column_nr] |
||
185 | direction = self.params.get('order[{:d}][dir]'.format(i)) |
||
186 | sort_expr = column.sqla_expr |
||
187 | if direction == 'asc': |
||
188 | sort_expr = sort_expr.asc() |
||
189 | elif direction == 'desc': |
||
190 | sort_expr = sort_expr.desc() |
||
191 | else: |
||
192 | raise ValueError( |
||
193 | 'Invalid order direction: {}'.format(direction)) |
||
194 | if column.nulls_order: |
||
195 | if column.nulls_order == 'nullsfirst': |
||
196 | sort_expr = sort_expr.nullsfirst() |
||
197 | elif column.nulls_order == 'nullslast': |
||
198 | sort_expr = sort_expr.nullslast() |
||
199 | else: |
||
200 | raise ValueError( |
||
201 | 'Invalid order direction: {}'.format(direction)) |
||
202 | |||
203 | sort_expressions.append(sort_expr) |
||
204 | i += 1 |
||
205 | self.sort_expressions = sort_expressions |
||
0 ignored issues
–
show
|
|||
206 | |||
207 | def _get_regex_operator(self): |
||
208 | if isinstance(self.query.session.bind.dialect, postgresql.dialect): |
||
209 | return '~' |
||
210 | elif isinstance(self.query.session.bind.dialect, mysql.dialect): |
||
211 | return 'REGEXP' |
||
212 | elif isinstance(self.query.session.bind.dialect, sqlite.dialect): |
||
213 | return 'REGEXP' |
||
214 | else: |
||
215 | raise NotImplementedError( |
||
216 | 'Regex searches are not implemented for this dialect') |
||
217 |
The coding style of this project requires that you add a docstring to this code element. Below, you find an example for methods:
If you would like to know more about docstrings, we recommend to read PEP-257: Docstring Conventions.