1 | from __future__ import absolute_import |
||
0 ignored issues
–
show
|
|||
2 | |||
3 | import math |
||
4 | |||
5 | from sqlalchemy import Text, func, or_ |
||
6 | from sqlalchemy.dialects import mysql, postgresql, sqlite |
||
7 | |||
8 | from datatables.clean_regex import clean_regex |
||
9 | from datatables.search_methods import SEARCH_METHODS |
||
10 | |||
11 | |||
12 | class DataTables: |
||
0 ignored issues
–
show
|
|||
13 | """Class defining a DataTables object. |
||
14 | |||
15 | :param request: request containing the GET values, specified by the |
||
16 | datatable for filtering, sorting and paging |
||
17 | :type request: pyramid.request |
||
18 | :param query: the query wanted to be seen in the the table |
||
19 | :type query: sqlalchemy.orm.query.Query |
||
20 | :param columns: columns specification for the datatables |
||
21 | :type columns: list |
||
22 | |||
23 | :returns: a DataTables object |
||
24 | """ |
||
25 | |||
26 | def __init__(self, request, query, columns, allow_regex_searches=False): |
||
27 | """Initialize object and run the query.""" |
||
28 | self.params = dict(request) |
||
29 | if 'sEcho' in self.params: |
||
30 | raise ValueError( |
||
31 | 'Legacy datatables not supported, upgrade to >=1.10') |
||
32 | self.query = query |
||
33 | self.columns = columns |
||
34 | self.results = None |
||
35 | self.allow_regex_searches = allow_regex_searches |
||
36 | |||
37 | # total in the table after filtering |
||
38 | self.cardinality_filtered = 0 |
||
39 | |||
40 | # total in the table unfiltered |
||
41 | self.cardinality = 0 |
||
42 | |||
43 | self.yadcf_params = [] |
||
44 | self.filter_expressions = [] |
||
45 | self.error = None |
||
46 | try: |
||
47 | self.run() |
||
48 | except Exception as exc: |
||
0 ignored issues
–
show
Catching very general exceptions such as
Exception is usually not recommended.
Generally, you would want to handle very specific errors in the exception handler. This ensure that you do not hide other types of errors which should be fixed. So, unless you specifically plan to handle any error, consider adding a more specific exception.
Loading history...
|
|||
49 | self.error = str(exc) |
||
50 | |||
51 | def output_result(self): |
||
52 | """Output results in the format needed by DataTables.""" |
||
53 | output = {} |
||
54 | output['draw'] = str(int(self.params.get('draw', 1))) |
||
55 | output['recordsTotal'] = str(self.cardinality) |
||
56 | output['recordsFiltered'] = str(self.cardinality_filtered) |
||
57 | if self.error: |
||
58 | output['error'] = self.error |
||
59 | return output |
||
60 | |||
61 | output['data'] = self.results |
||
62 | for k, v in self.yadcf_params: |
||
0 ignored issues
–
show
The name
v does not conform to the variable naming conventions ((([a-z][a-z0-9_]{2,30})|(_[a-z0-9_]*))$ ).
This check looks for invalid names for a range of different identifiers. You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements. If your project includes a Pylint configuration file, the settings contained in that file take precedence. To find out more about Pylint, please refer to their site.
Loading history...
|
|||
63 | output[k] = v |
||
64 | return output |
||
65 | |||
66 | def _query_with_all_filters_except_one(self, query, exclude): |
||
67 | return query.filter(*[ |
||
68 | e for i, e in enumerate(self.filter_expressions) |
||
69 | if e is not None and i is not exclude |
||
70 | ]) |
||
71 | |||
72 | def _set_yadcf_data(self, query): |
||
73 | # determine values for yadcf filters |
||
74 | for i, col in enumerate(self.columns): |
||
75 | if col.search_method in 'yadcf_range_number_slider': |
||
76 | v = query.add_columns( |
||
0 ignored issues
–
show
The name
v does not conform to the variable naming conventions ((([a-z][a-z0-9_]{2,30})|(_[a-z0-9_]*))$ ).
This check looks for invalid names for a range of different identifiers. You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements. If your project includes a Pylint configuration file, the settings contained in that file take precedence. To find out more about Pylint, please refer to their site.
Loading history...
|
|||
77 | func.min(col.sqla_expr), func.max(col.sqla_expr)).one() |
||
78 | self.yadcf_params.append(('yadcf_data_{:d}'.format(i), |
||
79 | (math.floor(v[0]), math.ceil(v[1])))) |
||
80 | if col.search_method in [ |
||
81 | 'yadcf_select', 'yadcf_multi_select', 'yadcf_autocomplete' |
||
82 | ]: |
||
83 | filtered = self._query_with_all_filters_except_one( |
||
84 | query=query, exclude=i) |
||
85 | v = filtered.add_columns(col.sqla_expr).distinct().all() |
||
0 ignored issues
–
show
The name
v does not conform to the variable naming conventions ((([a-z][a-z0-9_]{2,30})|(_[a-z0-9_]*))$ ).
This check looks for invalid names for a range of different identifiers. You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements. If your project includes a Pylint configuration file, the settings contained in that file take precedence. To find out more about Pylint, please refer to their site.
Loading history...
|
|||
86 | self.yadcf_params.append(('yadcf_data_{:d}'.format(i), |
||
87 | [r[0] for r in v])) |
||
88 | |||
89 | def run(self): |
||
90 | """Launch filtering, sorting and paging to output results.""" |
||
91 | query = self.query |
||
92 | |||
93 | # count before filtering |
||
94 | self.cardinality = query.add_columns(self.columns[0].sqla_expr).count() |
||
95 | |||
96 | self._set_column_filter_expressions() |
||
97 | self._set_global_filter_expression() |
||
98 | self._set_sort_expressions() |
||
99 | self._set_yadcf_data(query) |
||
100 | |||
101 | # apply filters |
||
102 | query = query.filter( |
||
103 | *[e for e in self.filter_expressions if e is not None]) |
||
104 | |||
105 | self.cardinality_filtered = query.add_columns( |
||
106 | self.columns[0].sqla_expr).count() |
||
107 | |||
108 | # apply sorts |
||
109 | query = query.order_by( |
||
110 | *[e for e in self.sort_expressions if e is not None]) |
||
111 | |||
112 | # add paging options |
||
113 | length = int(self.params.get('length')) |
||
114 | if length >= 0: |
||
115 | query = query.limit(length) |
||
116 | elif length == -1: |
||
117 | pass |
||
118 | else: |
||
119 | raise (ValueError( |
||
120 | 'Length should be a positive integer or -1 to disable')) |
||
121 | query = query.offset(int(self.params.get('start'))) |
||
122 | |||
123 | # add columns to query |
||
124 | query = query.add_columns(*[c.sqla_expr for c in self.columns]) |
||
125 | |||
126 | # fetch the result of the queries |
||
127 | column_names = [ |
||
128 | col.mData if col.mData else str(i) |
||
129 | for i, col in enumerate(self.columns) |
||
130 | ] |
||
131 | self.results = [{k: v |
||
132 | for k, v in zip(column_names, row)} |
||
133 | for row in query.all()] |
||
134 | |||
135 | def _set_column_filter_expressions(self): |
||
136 | """Construct the query: filtering. |
||
137 | |||
138 | Add filtering when per column searching is used. |
||
139 | """ |
||
140 | # per columns filters: |
||
141 | for i in range(len(self.columns)): |
||
142 | filter_expr = None |
||
143 | value = self.params.get('columns[{:d}][search][value]'.format(i), |
||
144 | '') |
||
145 | if value: |
||
146 | search_func = SEARCH_METHODS[self.columns[i].search_method] |
||
147 | filter_expr = search_func(self.columns[i].sqla_expr, value) |
||
148 | self.filter_expressions.append(filter_expr) |
||
149 | |||
150 | def _set_global_filter_expression(self): |
||
151 | # global search filter |
||
152 | global_search = self.params.get('search[value]', '') |
||
153 | if global_search is '': |
||
0 ignored issues
–
show
|
|||
154 | return |
||
155 | |||
156 | if (self.allow_regex_searches |
||
157 | and self.params.get('search[regex]') == 'true'): |
||
158 | op = self._get_regex_operator() |
||
0 ignored issues
–
show
The name
op does not conform to the variable naming conventions ((([a-z][a-z0-9_]{2,30})|(_[a-z0-9_]*))$ ).
This check looks for invalid names for a range of different identifiers. You can set regular expressions to which the identifiers must conform if the defaults do not match your requirements. If your project includes a Pylint configuration file, the settings contained in that file take precedence. To find out more about Pylint, please refer to their site.
Loading history...
|
|||
159 | val = clean_regex(global_search) |
||
160 | |||
161 | def filter_for(col): |
||
0 ignored issues
–
show
This function should have a docstring.
The coding style of this project requires that you add a docstring to this code element. Below, you find an example for methods: class SomeClass:
def some_method(self):
"""Do x and return foo."""
If you would like to know more about docstrings, we recommend to read PEP-257: Docstring Conventions.
Loading history...
|
|||
162 | return col.sqla_expr.op(op)(val) |
||
0 ignored issues
–
show
|
|||
163 | else: |
||
164 | val = '%' + global_search + '%' |
||
165 | |||
166 | def filter_for(col): |
||
0 ignored issues
–
show
This function should have a docstring.
The coding style of this project requires that you add a docstring to this code element. Below, you find an example for methods: class SomeClass:
def some_method(self):
"""Do x and return foo."""
If you would like to know more about docstrings, we recommend to read PEP-257: Docstring Conventions.
Loading history...
|
|||
167 | return col.sqla_expr.cast(Text).ilike(val) |
||
168 | |||
169 | global_filter = [ |
||
170 | filter_for(col) for col in self.columns if col.global_search |
||
171 | ] |
||
172 | |||
173 | self.filter_expressions.append(or_(*global_filter)) |
||
174 | |||
175 | def _set_sort_expressions(self): |
||
176 | """Construct the query: sorting. |
||
177 | |||
178 | Add sorting(ORDER BY) on the columns needed to be applied on. |
||
179 | """ |
||
180 | sort_expressions = [] |
||
181 | i = 0 |
||
182 | while self.params.get('order[{:d}][column]'.format(i), False): |
||
183 | column_nr = int(self.params.get('order[{:d}][column]'.format(i))) |
||
184 | column = self.columns[column_nr] |
||
185 | direction = self.params.get('order[{:d}][dir]'.format(i)) |
||
186 | sort_expr = column.sqla_expr |
||
187 | if direction == 'asc': |
||
188 | sort_expr = sort_expr.asc() |
||
189 | elif direction == 'desc': |
||
190 | sort_expr = sort_expr.desc() |
||
191 | else: |
||
192 | raise ValueError( |
||
193 | 'Invalid order direction: {}'.format(direction)) |
||
194 | if column.nulls_order: |
||
195 | if column.nulls_order == 'nullsfirst': |
||
196 | sort_expr = sort_expr.nullsfirst() |
||
197 | elif column.nulls_order == 'nullslast': |
||
198 | sort_expr = sort_expr.nullslast() |
||
199 | else: |
||
200 | raise ValueError( |
||
201 | 'Invalid order direction: {}'.format(direction)) |
||
202 | |||
203 | sort_expressions.append(sort_expr) |
||
204 | i += 1 |
||
205 | self.sort_expressions = sort_expressions |
||
0 ignored issues
–
show
|
|||
206 | |||
207 | def _get_regex_operator(self): |
||
208 | if isinstance(self.query.session.bind.dialect, postgresql.dialect): |
||
209 | return '~' |
||
210 | elif isinstance(self.query.session.bind.dialect, mysql.dialect): |
||
211 | return 'REGEXP' |
||
212 | elif isinstance(self.query.session.bind.dialect, sqlite.dialect): |
||
213 | return 'REGEXP' |
||
214 | else: |
||
215 | raise NotImplementedError( |
||
216 | 'Regex searches are not implemented for this dialect') |
||
217 |
The coding style of this project requires that you add a docstring to this code element. Below, you find an example for methods:
If you would like to know more about docstrings, we recommend to read PEP-257: Docstring Conventions.