Issues (109)

datatables/datatables.py (1 issue)

Checks for unused variables.

Unused Code Minor
1
from __future__ import absolute_import
2
3
import math
4
5
from sqlalchemy import Text, func, or_
6
from sqlalchemy.dialects import mysql, postgresql, sqlite
7
8
from datatables.clean_regex import clean_regex
9
from datatables.search_methods import SEARCH_METHODS
10
11
12
class DataTables:
0 ignored issues
show
The variable __class__ seems to be unused.
Loading history...
13
    """Class defining a DataTables object.
14
15
    :param request: request containing the GET values, specified by the
16
        datatable for filtering, sorting and paging
17
    :type request: pyramid.request
18
    :param query: the query wanted to be seen in the the table
19
    :type query: sqlalchemy.orm.query.Query
20
    :param columns: columns specification for the datatables
21
    :type columns: list
22
23
    :returns: a DataTables object
24
    """
25
26
    def __init__(self, request, query, columns, allow_regex_searches=False):
27
        """Initialize object and run the query."""
28
        self.params = dict(request)
29
        if 'sEcho' in self.params:
30
            raise ValueError(
31
                'Legacy datatables not supported, upgrade to >=1.10')
32
        self.query = query
33
        self.columns = columns
34
        self.results = None
35
        self.allow_regex_searches = allow_regex_searches
36
37
        # total in the table after filtering
38
        self.cardinality_filtered = 0
39
40
        # total in the table unfiltered
41
        self.cardinality = 0
42
43
        self.yadcf_params = []
44
        self.filter_expressions = []
45
        self.error = None
46
        try:
47
            self.run()
48
        except Exception as exc:
49
            self.error = str(exc)
50
51
    def output_result(self):
52
        """Output results in the format needed by DataTables."""
53
        output = {}
54
        output['draw'] = str(int(self.params.get('draw', 1)))
55
        output['recordsTotal'] = str(self.cardinality)
56
        output['recordsFiltered'] = str(self.cardinality_filtered)
57
        if self.error:
58
            output['error'] = self.error
59
            return output
60
61
        output['data'] = self.results
62
        for k, v in self.yadcf_params:
63
            output[k] = v
64
        return output
65
66
    def _query_with_all_filters_except_one(self, query, exclude):
67
        return query.filter(*[
68
            e for i, e in enumerate(self.filter_expressions)
69
            if e is not None and i is not exclude
70
        ])
71
72
    def _set_yadcf_data(self, query):
73
        # determine values for yadcf filters
74
        for i, col in enumerate(self.columns):
75
            if col.search_method in 'yadcf_range_number_slider':
76
                v = query.add_columns(
77
                    func.min(col.sqla_expr), func.max(col.sqla_expr)).one()
78
                self.yadcf_params.append(('yadcf_data_{:d}'.format(i),
79
                                          (math.floor(v[0]), math.ceil(v[1]))))
80
            if col.search_method in [
81
                    'yadcf_select', 'yadcf_multi_select', 'yadcf_autocomplete'
82
            ]:
83
                filtered = self._query_with_all_filters_except_one(
84
                    query=query, exclude=i)
85
                v = filtered.add_columns(col.sqla_expr).distinct().all()
86
                self.yadcf_params.append(('yadcf_data_{:d}'.format(i),
87
                                          [r[0] for r in v]))
88
89
    def run(self):
90
        """Launch filtering, sorting and paging to output results."""
91
        query = self.query
92
93
        # count before filtering
94
        self.cardinality = query.add_columns(self.columns[0].sqla_expr).count()
95
96
        self._set_column_filter_expressions()
97
        self._set_global_filter_expression()
98
        self._set_sort_expressions()
99
        self._set_yadcf_data(query)
100
101
        # apply filters
102
        query = query.filter(
103
            *[e for e in self.filter_expressions if e is not None])
104
105
        self.cardinality_filtered = query.add_columns(
106
            self.columns[0].sqla_expr).count()
107
108
        # apply sorts
109
        query = query.order_by(
110
            *[e for e in self.sort_expressions if e is not None])
111
112
        # add paging options
113
        length = int(self.params.get('length'))
114
        if length >= 0:
115
            query = query.limit(length)
116
        elif length == -1:
117
            pass
118
        else:
119
            raise (ValueError(
120
                'Length should be a positive integer or -1 to disable'))
121
        query = query.offset(int(self.params.get('start')))
122
123
        # add columns to query
124
        query = query.add_columns(*[c.sqla_expr for c in self.columns])
125
126
        # fetch the result of the queries
127
        column_names = [
128
            col.mData if col.mData else str(i)
129
            for i, col in enumerate(self.columns)
130
        ]
131
        self.results = [{k: v
132
                         for k, v in zip(column_names, row)}
133
                        for row in query.all()]
134
135
    def _set_column_filter_expressions(self):
136
        """Construct the query: filtering.
137
138
        Add filtering when per column searching is used.
139
        """
140
        # per columns filters:
141
        for i in range(len(self.columns)):
142
            filter_expr = None
143
            value = self.params.get('columns[{:d}][search][value]'.format(i),
144
                                    '')
145
            if value:
146
                search_func = SEARCH_METHODS[self.columns[i].search_method]
147
                filter_expr = search_func(self.columns[i].sqla_expr, value)
148
            self.filter_expressions.append(filter_expr)
149
150
    def _set_global_filter_expression(self):
151
        # global search filter
152
        global_search = self.params.get('search[value]', '')
153
        if global_search is '':
154
            return
155
156
        if (self.allow_regex_searches
157
                and self.params.get('search[regex]') == 'true'):
158
            op = self._get_regex_operator()
159
            val = clean_regex(global_search)
160
161
            def filter_for(col):
162
                return col.sqla_expr.op(op)(val)
163
        else:
164
            val = '%' + global_search + '%'
165
166
            def filter_for(col):
167
                return col.sqla_expr.cast(Text).ilike(val)
168
169
        global_filter = [
170
            filter_for(col) for col in self.columns if col.global_search
171
        ]
172
173
        self.filter_expressions.append(or_(*global_filter))
174
175
    def _set_sort_expressions(self):
176
        """Construct the query: sorting.
177
178
        Add sorting(ORDER BY) on the columns needed to be applied on.
179
        """
180
        sort_expressions = []
181
        i = 0
182
        while self.params.get('order[{:d}][column]'.format(i), False):
183
            column_nr = int(self.params.get('order[{:d}][column]'.format(i)))
184
            column = self.columns[column_nr]
185
            direction = self.params.get('order[{:d}][dir]'.format(i))
186
            sort_expr = column.sqla_expr
187
            if direction == 'asc':
188
                sort_expr = sort_expr.asc()
189
            elif direction == 'desc':
190
                sort_expr = sort_expr.desc()
191
            else:
192
                raise ValueError(
193
                    'Invalid order direction: {}'.format(direction))
194
            if column.nulls_order:
195
                if column.nulls_order == 'nullsfirst':
196
                    sort_expr = sort_expr.nullsfirst()
197
                elif column.nulls_order == 'nullslast':
198
                    sort_expr = sort_expr.nullslast()
199
                else:
200
                    raise ValueError(
201
                        'Invalid order direction: {}'.format(direction))
202
203
            sort_expressions.append(sort_expr)
204
            i += 1
205
        self.sort_expressions = sort_expressions
206
207
    def _get_regex_operator(self):
208
        if isinstance(self.query.session.bind.dialect, postgresql.dialect):
209
            return '~'
210
        elif isinstance(self.query.session.bind.dialect, mysql.dialect):
211
            return 'REGEXP'
212
        elif isinstance(self.query.session.bind.dialect, sqlite.dialect):
213
            return 'REGEXP'
214
        else:
215
            raise NotImplementedError(
216
                'Regex searches are not implemented for this dialect')
217