ActionAliasFormatParser.match_params_in_stream()   F
last analyzed

Complexity

Conditions 10

Size

Total Lines 27

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 10
dl 0
loc 27
rs 3.1304
c 1
b 0
f 0

How to fix   Complexity   

Complexity

Complex classes like ActionAliasFormatParser.match_params_in_stream() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import re
17
import sys
18
from sre_parse import (
19
    parse, AT, AT_BEGINNING, AT_BEGINNING_STRING, AT_END, AT_END_STRING,
20
    BRANCH, SUBPATTERN,
21
)
22
23
from st2common.exceptions.content import ParseException
24
from st2common import log
25
26
__all__ = [
27
    'ActionAliasFormatParser',
28
29
    'extract_parameters_for_action_alias_db',
30
    'extract_parameters',
31
    'search_regex_tokens',
32
]
33
34
35
LOG = log.getLogger(__name__)
36
37
# Python 3 compatibility
38
if sys.version_info > (3,):
39
    SUBPATTERN_INDEX = 3
40
else:
41
    SUBPATTERN_INDEX = 1
42
43
44
class ActionAliasFormatParser(object):
45
46
    def __init__(self, alias_format=None, param_stream=None):
47
        self._format = alias_format or ''
48
        self._original_param_stream = param_stream or ''
49
        self._param_stream = self._original_param_stream
50
        self._snippets = self.generate_snippets()
51
52
        # As there's a lot of questions about using regular expressions,
53
        # I'll try to be thorough when documenting this code.
54
55
        # 1. Matching the arbitrary key-value pairs at the end of the command
56
        # to support extra parameters (not specified in the format string),
57
        # and cutting them from the command string afterwards.
58
        self._kv_pairs, self._param_stream = self.match_kv_pairs_at_end()
59
60
        # 2. Matching optional parameters (with default values).
61
        self._optional = self.generate_optional_params_regex()
62
63
        # 3. Convert the mangled format string into a regex object
64
        self._regex = self.transform_format_string_into_regex()
65
66
    def generate_snippets(self):
67
        # I'll split the whole convoluted regex into snippets to make it
68
        # a bit more readable (hopefully).
69
        snippets = dict()
70
71
        # Formats for keys and values: key is a non-spaced string,
72
        # value is anything in quotes or curly braces, or a single word.
73
        snippets['key'] = r'\s*(\S+?)\s*'
74
        snippets['value'] = r'""|\'\'|"(.+?)"|\'(.+?)\'|({.+?})|(\S+)'
75
76
        # Extended value: also matches unquoted text (caution).
77
        snippets['ext_value'] = r'""|\'\'|"(.+?)"|\'(.+?)\'|({.+?})|(.+?)'
78
79
        # Key-value pair:
80
        snippets['pairs'] = r'(?:^|\s+){key}=({value})'.format(**snippets)
81
82
        # End of string: multiple space-separated key-value pairs:
83
        snippets['ending'] = r'.*?(({pairs}\s*)*)$'.format(**snippets)
84
85
        # Default value in optional parameters:
86
        snippets['default'] = r'\s*=\s*(?:{ext_value})\s*'.format(**snippets)
87
88
        # Optional parameter (has a default value):
89
        snippets['optional'] = '{{' + snippets['key'] + snippets['default'] + '}}'
90
91
        # Required parameter (no default value):
92
        snippets['required'] = '{{' + snippets['key'] + '}}'
93
94
        return snippets
95
96
    def match_kv_pairs_at_end(self):
97
        param_stream = self._param_stream
98
99
        # 1. Matching the arbitrary key-value pairs at the end of the command
100
        # to support extra parameters (not specified in the format string),
101
        # and cutting them from the command string afterwards.
102
        ending_pairs = re.match(self._snippets['ending'], param_stream, re.DOTALL)
103
        has_ending_pairs = ending_pairs and ending_pairs.group(1)
104
        if has_ending_pairs:
105
            kv_pairs = re.findall(self._snippets['pairs'], ending_pairs.group(1), re.DOTALL)
106
            param_stream = param_stream.replace(ending_pairs.group(1), '')
107
        else:
108
            kv_pairs = []
109
        param_stream = " %s " % (param_stream)
110
111
        return (kv_pairs, param_stream)
112
113
    def generate_optional_params_regex(self):
114
        # 2. Matching optional parameters (with default values).
115
        return re.findall(self._snippets['optional'], self._format, re.DOTALL)
116
117
    def transform_format_string_into_regex(self):
118
        # 3. Convert the mangled format string into a regex object
119
        # Transforming our format string into a regular expression,
120
        # substituting {{ ... }} with regex named groups, so that param_stream
121
        # matched against this expression yields a dict of params with values.
122
        param_match = r'\1["\']?(?P<\2>(?:(?<=\').+?(?=\')|(?<=").+?(?=")|{.+?}|.+?))["\']?'
123
        reg = re.sub(r'(\s*)' + self._snippets['optional'], r'(?:' + param_match + r')?',
124
                     self._format)
125
        reg = re.sub(r'(\s*)' + self._snippets['required'], param_match, reg)
126
127
        reg_tokens = parse(reg, flags=re.DOTALL)
128
129
        # Add a beginning anchor if none exists
130
        if not search_regex_tokens(((AT, AT_BEGINNING), (AT, AT_BEGINNING_STRING)), reg_tokens):
131
            reg = r'^\s*' + reg
132
133
        # Add an ending anchor if none exists
134
        if not search_regex_tokens(((AT, AT_END), (AT, AT_END_STRING)), reg_tokens, backwards=True):
135
            reg = reg + r'\s*$'
136
137
        return re.compile(reg, re.DOTALL)
138
139
    def match_params_in_stream(self, matched_stream):
140
        # 5. Pull out a dictionary of matched groups, apply default parameters and extra parameters
141
        if not matched_stream:
142
            # If no match is found we throw since this indicates provided user string (command)
143
            # didn't match the provided format string
144
            raise ParseException('Command "%s" doesn\'t match format string "%s"' %
145
                                 (self._original_param_stream, self._format))
146
147
        # Compiling results from the steps 1-3.
148
        if matched_stream:
149
            result = matched_stream.groupdict()
150
151
        # Apply optional parameters/add the default parameters
152
        for param in self._optional:
153
            matched_value = result[param[0]] if matched_stream else None
154
            matched_result = matched_value or ''.join(param[1:])
155
            if matched_result is not None:
156
                result[param[0]] = matched_result
157
158
        # Apply given parameters
159
        for pair in self._kv_pairs:
160
            result[pair[0]] = ''.join(pair[2:])
161
162
        if self._format and not (self._param_stream.strip() or any(result.values())):
163
            raise ParseException('No value supplied and no default value found.')
164
165
        return result
166
167
    def get_extracted_param_value(self):
168
        """
169
        Match command against the format string and extract parameters from the command string.
170
171
        :rtype: ``dict``
172
        """
173
        # 4. Matching the command against our regex to get the param values
174
        matched_stream = self._regex.search(self._param_stream)
175
176
        return self.match_params_in_stream(matched_stream)
177
178
    def get_multiple_extracted_param_value(self):
179
        """
180
        Match command against the format string and extract parameters from the command string.
181
182
        :rtype: ``list of dicts``
183
        """
184
        # 4. Matching the command against our regex to get the param values
185
        matched_streams = self._regex.finditer(self._param_stream)
186
187
        results = []
188
        for matched_stream in matched_streams:
189
            results.append(self.match_params_in_stream(matched_stream))
190
        return results
191
192
193
def extract_parameters_for_action_alias_db(action_alias_db, format_str, param_stream,
194
                                           match_multiple=False):
195
    """
196
    Extract parameters from the user input based on the provided format string.
197
198
    Note: This function makes sure that the provided format string is indeed available in the
199
    action_alias_db.formats.
200
    """
201
    formats = []
202
    formats = action_alias_db.get_format_strings()
203
204
    if format_str not in formats:
205
        raise ValueError('Format string "%s" is not available on the alias "%s"' %
206
                         (format_str, action_alias_db.name))
207
208
    result = extract_parameters(
209
        format_str=format_str,
210
        param_stream=param_stream,
211
        match_multiple=match_multiple)
212
    return result
213
214
215
def extract_parameters(format_str, param_stream, match_multiple=False):
216
    parser = ActionAliasFormatParser(alias_format=format_str, param_stream=param_stream)
217
    if match_multiple:
218
        return parser.get_multiple_extracted_param_value()
219
    else:
220
        return parser.get_extracted_param_value()
221
222
223
def search_regex_tokens(needle_tokens, haystack_tokens, backwards=False):
224
    """
225
    Search a tokenized regex for any tokens in needle_tokens. Returns True if
226
    any token tuple in needle_tokens is found, and False otherwise.
227
228
    >>> search_regex_tokens(((AT, AT_END), (AT, AT_END)), parse(r'^asdf'))
229
    False
230
231
    :param needle_tokens: an iterable of token tuples
232
233
    >>> needle_tokens = ((AT, AT_END), (AT, AT_END))
234
    >>> search_regex_tokens(needle_tokens, parse(r'^asdf$'))
235
    True
236
237
    :param haystack_tokens: an iterable of token tuples from sre_parse.parse
238
239
    >>> regex_tokens = parse(r'^(?:more regex)$')
240
    >>> list(regex_tokens)  # doctest: +NORMALIZE_WHITESPACE
241
    [(AT, AT_BEGINNING),
242
     (SUBPATTERN, (None, 0, 0,
243
     [(LITERAL, 109), (LITERAL, 111), (LITERAL, 114), (LITERAL, 101),
244
      (LITERAL, 32), (LITERAL, 114), (LITERAL, 101), (LITERAL, 103),
245
      (LITERAL, 101), (LITERAL, 120)])), (AT, AT_END)]
246
247
    >>> search_regex_tokens(((AT, AT_END), (AT, AT_END)), regex_tokens)
248
    True
249
250
    :param backwards: Controls direction of search, defaults to False.
251
    :type backwards: bool or None
252
253
    .. note:: Set backwards to True if needle_tokens are more likely to be
254
    found at the end of the haystack_tokens iterable, eg: ending anchors.
255
256
    >>> search_regex_tokens(((AT, AT_END), (AT, AT_END)), parse(r'^asdf$'))
257
    True
258
    >>> search_regex_tokens(((AT, AT_END), (AT, AT_END)), parse(r'^asdf$'), backwards=True)
259
    True
260
261
    :rtype: ``bool``
262
    """
263
    if backwards:
264
        haystack_tokens = reversed(haystack_tokens)
265
266
    for rtoken_type, rtoken in haystack_tokens:
267
        LOG.debug("Matching: ({}, {})".format(rtoken_type, rtoken))
268
        if rtoken_type == SUBPATTERN:
269
            LOG.debug("SUBPATTERN: {}".format(rtoken))
270
            if search_regex_tokens(needle_tokens, rtoken[SUBPATTERN_INDEX]):
271
                return True
272
        elif rtoken_type == BRANCH:
273
            LOG.debug("BRANCH: {}".format(rtoken))
274
            if search_regex_tokens(needle_tokens, rtoken[1][1]):
275
                return True
276
        elif (rtoken_type, rtoken) in needle_tokens:
277
            LOG.debug("Found: {}".format((rtoken_type, rtoken)))
278
            return True
279
    else:
0 ignored issues
show
Bug introduced by
The else clause is not necessary as the loop does not contain a break statement.

If the loop cannot exit early through the use of break, the else part will always be executed. You can therefore just leave off the else.

Loading history...
280
        LOG.debug("Not found: {}".format(needle_tokens))
281
        return False
282