Test Failed
Push — develop ( 24a3ae...5f48d5 )
by Nicolas
02:20 queued 14s
created

glances.filter.GlancesFilterList._add_filter()   A

Complexity

Conditions 1

Size

Total Lines 5
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 4
nop 2
dl 0
loc 5
rs 10
c 0
b 0
f 0
1
# -*- coding: utf-8 -*-
2
#
3
# This file is part of Glances.
4
#
5
# SPDX-FileCopyrightText: 2022 Nicolas Hennion <[email protected]>
6
#
7
# SPDX-License-Identifier: LGPL-3.0-only
8
#
9
10
import re
11
12
from glances.logger import logger
13
14
15
class GlancesFilterList(object):
16
    """Manage a lis of GlancesFilter objects
17
18
    >>> fl = GlancesFilterList()
19
    >>> fl.filter = '.*python.*,user:nicolargo'
20
    >>> fl.is_filtered({'name': 'python is in the place'})
21
    True
22
    >>> fl.is_filtered({'name': 'snake is in the place'})
23
    False
24
    >>> fl.is_filtered({'name': 'snake is in the place', 'username': 'nicolargo'})
25
    True
26
    >>> fl.is_filtered({'name': 'snake is in the place', 'username': 'notme'})
27
    False
28
    """
29
30
    def __init__(self):
31
        self._filter = []
32
33
    @property
34
    def filter(self):
35
        """Return the current filter to be applied"""
36
        return self._filter
37
38
    @filter.setter
39
    def filter(self, value):
40
        """Add a comma separated list of filters"""
41
        for f in value.split(','):
42
            self._add_filter(f)
43
44
    def _add_filter(self, filter_input):
45
        """Add a filter"""
46
        f = GlancesFilter()
47
        f.filter = filter_input
48
        self._filter.append(f)
49
50
    def is_filtered(self, process):
51
        """Return True if the process is filtered by at least one filter"""
52
        for f in self._filter:
53
            if f.is_filtered(process):
54
                return True
55
        return False
56
57
58
class GlancesFilter(object):
59
60
    """Allow Glances to filter processes
61
62
    >>> f = GlancesFilter()
63
    >>> f.filter = '.*python.*'
64
    >>> f.filter
65
    '.*python.*'
66
    >>> f.filter_key
67
    None
68
    >>> f.filter = 'username:nicolargo'
69
    >>> f.filter
70
    'nicolargo'
71
    >>> f.filter_key
72
    'username'
73
    >>> f.filter = 'username:.*nico.*'
74
    >>> f.filter
75
    '.*nico.*'
76
    >>> f.key
77
    'username'
78
    """
79
80
    def __init__(self):
81
        # Filter entered by the user (string)
82
        self._filter_input = None
83
        # Filter to apply
84
        self._filter = None
85
        # Filter regular expression
86
        self._filter_re = None
87
        # Dict key where the filter should be applied
88
        # Default is None: search on command line and process name
89
        self._filter_key = None
90
91
    @property
92
    def filter_input(self):
93
        """Return the filter given by the user (as a string)"""
94
        return self._filter_input
95
96
    @property
97
    def filter(self):
98
        """Return the current filter to be applied"""
99
        return self._filter
100
101
    @filter.setter
102
    def filter(self, value):
103
        """Set the filter (as a string) and compute the regular expression
104
105
        A filter could be one of the following:
106
        - python > Process name start with python
107
        - .*python.* > Process name contain python
108
        - user:nicolargo > Process belong to nicolargo user
109
        """
110
        self._filter_input = value
111
        if value is None:
112
            self._filter = None
113
            self._filter_key = None
114
        else:
115
            new_filter = value.split(':')
116
            if len(new_filter) == 1:
117
                self._filter = new_filter[0]
118
                self._filter_key = None
119
            else:
120
                self._filter = new_filter[1]
121
                self._filter_key = new_filter[0]
122
123
        self._filter_re = None
124
        if self.filter is not None:
125
            logger.debug("Set filter to {} on {}".format(
126
                self.filter,
127
                self.filter_key if self.filter_key else 'name or cmdline'))
128
            # Compute the regular expression
129
            try:
130
                self._filter_re = re.compile(self.filter)
131
                logger.debug("Filter regex compilation OK: {}".format(self.filter))
132
            except Exception as e:
133
                logger.error("Cannot compile filter regex: {} ({})".format(self.filter, e))
134
                self._filter = None
135
                self._filter_re = None
136
                self._filter_key = None
137
138
    @property
139
    def filter_re(self):
140
        """Return the filter regular expression"""
141
        return self._filter_re
142
143
    @property
144
    def filter_key(self):
145
        """key where the filter should be applied"""
146
        return self._filter_key
147
148
    def is_filtered(self, process):
149
        """Return True if the process item match the current filter
150
151
        :param process: A dict corresponding to the process item.
152
        """
153
        if self.filter is None:
154
            # No filter => Not filtered
155
            return False
156
157
        if self.filter_key is None:
158
            # Apply filter on command line and process name
159
            return self._is_process_filtered(process, key='name') or self._is_process_filtered(process, key='cmdline')
160
        else:
161
            # Apply filter on <key>
162
            return self._is_process_filtered(process)
163
164
    def _is_process_filtered(self, process, key=None):
165
        """Return True if the process[key] should be filtered according to the current filter"""
166
        if key is None:
167
            key = self.filter_key
168
        try:
169
            # If the item process[key] is a list, convert it to a string
170
            # in order to match it with the current regular expression
171
            if isinstance(process[key], list) and key == 'cmdline' and len(process[key]) > 0:
172
                value = process[key][0]
173
            elif isinstance(process[key], list):
174
                value = ' '.join(process[key])
175
            else:
176
                value = process[key]
177
        except KeyError:
178
            # If the key did not exist
179
            return False
180
        try:
181
            return self._filter_re.fullmatch(value) is not None
182
        except (AttributeError, TypeError):
183
            # AttributeError -  Filter processes crashes with a bad regular expression pattern (issue #665)
184
            # TypeError - Filter processes crashes if value is None (issue #1105)
185
            return False
186