Test Failed
Push — master ( e380d0...f5671d )
by W
02:58
created

st2common/models/utils/action_alias_utils.py (1 issue)

1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
from __future__ import absolute_import
17
import re
18
import sys
19
from sre_parse import (
20
    parse, AT, AT_BEGINNING, AT_BEGINNING_STRING, AT_END, AT_END_STRING,
21
    BRANCH, SUBPATTERN,
22
)
23
24
from st2common.exceptions.content import ParseException
25
from st2common import log
26
27
__all__ = [
28
    'ActionAliasFormatParser',
29
30
    'extract_parameters_for_action_alias_db',
31
    'extract_parameters',
32
    'search_regex_tokens',
33
]
34
35
36
LOG = log.getLogger(__name__)
37
38
# Python 3 compatibility
39
if sys.version_info > (3,):
40
    SUBPATTERN_INDEX = 3
41
else:
42
    SUBPATTERN_INDEX = 1
43
44
45
class ActionAliasFormatParser(object):
46
47
    def __init__(self, alias_format=None, param_stream=None):
48
        self._format = alias_format or ''
49
        self._original_param_stream = param_stream or ''
50
        self._param_stream = self._original_param_stream
51
        self._snippets = self.generate_snippets()
52
53
        # As there's a lot of questions about using regular expressions,
54
        # I'll try to be thorough when documenting this code.
55
56
        # 1. Matching the arbitrary key-value pairs at the end of the command
57
        # to support extra parameters (not specified in the format string),
58
        # and cutting them from the command string afterwards.
59
        self._kv_pairs, self._param_stream = self.match_kv_pairs_at_end()
60
61
        # 2. Matching optional parameters (with default values).
62
        self._optional = self.generate_optional_params_regex()
63
64
        # 3. Convert the mangled format string into a regex object
65
        self._regex = self.transform_format_string_into_regex()
66
67
    def generate_snippets(self):
68
        # I'll split the whole convoluted regex into snippets to make it
69
        # a bit more readable (hopefully).
70
        snippets = dict()
71
72
        # Formats for keys and values: key is a non-spaced string,
73
        # value is anything in quotes or curly braces, or a single word.
74
        snippets['key'] = r'\s*(\S+?)\s*'
75
        snippets['value'] = r'""|\'\'|"(.+?)"|\'(.+?)\'|({.+?})|(\S+)'
76
77
        # Extended value: also matches unquoted text (caution).
78
        snippets['ext_value'] = r'""|\'\'|"(.+?)"|\'(.+?)\'|({.+?})|(.+?)'
79
80
        # Key-value pair:
81
        snippets['pairs'] = r'(?:^|\s+){key}=({value})'.format(**snippets)
82
83
        # End of string: multiple space-separated key-value pairs:
84
        snippets['ending'] = r'.*?(({pairs}\s*)*)$'.format(**snippets)
85
86
        # Default value in optional parameters:
87
        snippets['default'] = r'\s*=\s*(?:{ext_value})\s*'.format(**snippets)
88
89
        # Optional parameter (has a default value):
90
        snippets['optional'] = '{{' + snippets['key'] + snippets['default'] + '}}'
91
92
        # Required parameter (no default value):
93
        snippets['required'] = '{{' + snippets['key'] + '}}'
94
95
        return snippets
96
97
    def match_kv_pairs_at_end(self):
98
        param_stream = self._param_stream
99
100
        # 1. Matching the arbitrary key-value pairs at the end of the command
101
        # to support extra parameters (not specified in the format string),
102
        # and cutting them from the command string afterwards.
103
        ending_pairs = re.match(self._snippets['ending'], param_stream, re.DOTALL)
104
        has_ending_pairs = ending_pairs and ending_pairs.group(1)
105
        if has_ending_pairs:
106
            kv_pairs = re.findall(self._snippets['pairs'], ending_pairs.group(1), re.DOTALL)
107
            param_stream = param_stream.replace(ending_pairs.group(1), '')
108
        else:
109
            kv_pairs = []
110
        param_stream = " %s " % (param_stream)
111
112
        return (kv_pairs, param_stream)
113
114
    def generate_optional_params_regex(self):
115
        # 2. Matching optional parameters (with default values).
116
        return re.findall(self._snippets['optional'], self._format, re.DOTALL)
117
118
    def transform_format_string_into_regex(self):
119
        # 3. Convert the mangled format string into a regex object
120
        # Transforming our format string into a regular expression,
121
        # substituting {{ ... }} with regex named groups, so that param_stream
122
        # matched against this expression yields a dict of params with values.
123
        param_match = r'\1["\']?(?P<\2>(?:(?<=\').+?(?=\')|(?<=").+?(?=")|{.+?}|.+?))["\']?'
124
        reg = re.sub(r'(\s*)' + self._snippets['optional'], r'(?:' + param_match + r')?',
125
                     self._format)
126
        reg = re.sub(r'(\s*)' + self._snippets['required'], param_match, reg)
127
128
        reg_tokens = parse(reg, flags=re.DOTALL)
129
130
        # Add a beginning anchor if none exists
131
        if not search_regex_tokens(((AT, AT_BEGINNING), (AT, AT_BEGINNING_STRING)), reg_tokens):
132
            reg = r'^\s*' + reg
133
134
        # Add an ending anchor if none exists
135
        if not search_regex_tokens(((AT, AT_END), (AT, AT_END_STRING)), reg_tokens, backwards=True):
136
            reg = reg + r'\s*$'
137
138
        return re.compile(reg, re.DOTALL)
139
140
    def match_params_in_stream(self, matched_stream):
141
        # 5. Pull out a dictionary of matched groups, apply default parameters and extra parameters
142
        if not matched_stream:
143
            # If no match is found we throw since this indicates provided user string (command)
144
            # didn't match the provided format string
145
            raise ParseException('Command "%s" doesn\'t match format string "%s"' %
146
                                 (self._original_param_stream, self._format))
147
148
        # Compiling results from the steps 1-3.
149
        if matched_stream:
150
            result = matched_stream.groupdict()
151
152
        # Apply optional parameters/add the default parameters
153
        for param in self._optional:
154
            matched_value = result[param[0]] if matched_stream else None
155
            matched_result = matched_value or ''.join(param[1:])
156
            if matched_result is not None:
157
                result[param[0]] = matched_result
158
159
        # Apply given parameters
160
        for pair in self._kv_pairs:
161
            result[pair[0]] = ''.join(pair[2:])
162
163
        if self._format and not (self._param_stream.strip() or any(result.values())):
164
            raise ParseException('No value supplied and no default value found.')
165
166
        return result
167
168
    def get_extracted_param_value(self):
169
        """
170
        Match command against the format string and extract parameters from the command string.
171
172
        :rtype: ``dict``
173
        """
174
        # 4. Matching the command against our regex to get the param values
175
        matched_stream = self._regex.search(self._param_stream)
176
177
        return self.match_params_in_stream(matched_stream)
178
179
    def get_multiple_extracted_param_value(self):
180
        """
181
        Match command against the format string and extract parameters from the command string.
182
183
        :rtype: ``list of dicts``
184
        """
185
        # 4. Matching the command against our regex to get the param values
186
        matched_streams = self._regex.finditer(self._param_stream)
187
188
        results = []
189
        for matched_stream in matched_streams:
190
            results.append(self.match_params_in_stream(matched_stream))
191
        return results
192
193
194
def extract_parameters_for_action_alias_db(action_alias_db, format_str, param_stream,
195
                                           match_multiple=False):
196
    """
197
    Extract parameters from the user input based on the provided format string.
198
199
    Note: This function makes sure that the provided format string is indeed available in the
200
    action_alias_db.formats.
201
    """
202
    formats = []
203
    formats = action_alias_db.get_format_strings()
204
205
    if format_str not in formats:
206
        raise ValueError('Format string "%s" is not available on the alias "%s"' %
207
                         (format_str, action_alias_db.name))
208
209
    result = extract_parameters(
210
        format_str=format_str,
211
        param_stream=param_stream,
212
        match_multiple=match_multiple)
213
    return result
214
215
216
def extract_parameters(format_str, param_stream, match_multiple=False):
217
    parser = ActionAliasFormatParser(alias_format=format_str, param_stream=param_stream)
218
    if match_multiple:
219
        return parser.get_multiple_extracted_param_value()
220
    else:
221
        return parser.get_extracted_param_value()
222
223
224
def search_regex_tokens(needle_tokens, haystack_tokens, backwards=False):
225
    """
226
    Search a tokenized regex for any tokens in needle_tokens. Returns True if
227
    any token tuple in needle_tokens is found, and False otherwise.
228
229
    >>> search_regex_tokens(((AT, AT_END), (AT, AT_END)), parse(r'^asdf'))
230
    False
231
232
    :param needle_tokens: an iterable of token tuples
233
234
    >>> needle_tokens = ((AT, AT_END), (AT, AT_END))
235
    >>> search_regex_tokens(needle_tokens, parse(r'^asdf$'))
236
    True
237
238
    :param haystack_tokens: an iterable of token tuples from sre_parse.parse
239
240
    >>> regex_tokens = parse(r'^(?:more regex)$')
241
    >>> list(regex_tokens)  # doctest: +NORMALIZE_WHITESPACE
242
    [(AT, AT_BEGINNING),
243
     (SUBPATTERN, (None, 0, 0,
244
     [(LITERAL, 109), (LITERAL, 111), (LITERAL, 114), (LITERAL, 101),
245
      (LITERAL, 32), (LITERAL, 114), (LITERAL, 101), (LITERAL, 103),
246
      (LITERAL, 101), (LITERAL, 120)])), (AT, AT_END)]
247
248
    >>> search_regex_tokens(((AT, AT_END), (AT, AT_END)), regex_tokens)
249
    True
250
251
    :param backwards: Controls direction of search, defaults to False.
252
    :type backwards: bool or None
253
254
    .. note:: Set backwards to True if needle_tokens are more likely to be
255
    found at the end of the haystack_tokens iterable, eg: ending anchors.
256
257
    >>> search_regex_tokens(((AT, AT_END), (AT, AT_END)), parse(r'^asdf$'))
258
    True
259
    >>> search_regex_tokens(((AT, AT_END), (AT, AT_END)), parse(r'^asdf$'), backwards=True)
260
    True
261
262
    :rtype: ``bool``
263
    """
264
    if backwards:
265
        haystack_tokens = reversed(haystack_tokens)
266
267
    for rtoken_type, rtoken in haystack_tokens:
268
        LOG.debug("Matching: ({}, {})".format(rtoken_type, rtoken))
269
        if rtoken_type == SUBPATTERN:
270
            LOG.debug("SUBPATTERN: {}".format(rtoken))
271
            if search_regex_tokens(needle_tokens, rtoken[SUBPATTERN_INDEX]):
272
                return True
273
        elif rtoken_type == BRANCH:
274
            LOG.debug("BRANCH: {}".format(rtoken))
275
            if search_regex_tokens(needle_tokens, rtoken[1][1]):
276
                return True
277
        elif (rtoken_type, rtoken) in needle_tokens:
278
            LOG.debug("Found: {}".format((rtoken_type, rtoken)))
279
            return True
280
    else:
0 ignored issues
show
The else clause is not necessary as the loop does not contain a break statement.

If the loop cannot exit early through the use of break, the else part will always be executed. You can therefore just leave off the else.

Loading history...
281
        LOG.debug("Not found: {}".format(needle_tokens))
282
        return False
283