1 | # Licensed to the StackStorm, Inc ('StackStorm') under one or more |
||
2 | # contributor license agreements. See the NOTICE file distributed with |
||
3 | # this work for additional information regarding copyright ownership. |
||
4 | # The ASF licenses this file to You under the Apache License, Version 2.0 |
||
5 | # (the "License"); you may not use this file except in compliance with |
||
6 | # the License. You may obtain a copy of the License at |
||
7 | # |
||
8 | # http://www.apache.org/licenses/LICENSE-2.0 |
||
9 | # |
||
10 | # Unless required by applicable law or agreed to in writing, software |
||
11 | # distributed under the License is distributed on an "AS IS" BASIS, |
||
12 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||
13 | # See the License for the specific language governing permissions and |
||
14 | # limitations under the License. |
||
15 | |||
16 | from __future__ import absolute_import |
||
17 | import re |
||
18 | import sys |
||
19 | from sre_parse import ( |
||
20 | parse, AT, AT_BEGINNING, AT_BEGINNING_STRING, AT_END, AT_END_STRING, |
||
21 | BRANCH, SUBPATTERN, |
||
22 | ) |
||
23 | |||
24 | from st2common.exceptions.content import ParseException |
||
25 | from st2common import log |
||
26 | |||
27 | __all__ = [ |
||
28 | 'ActionAliasFormatParser', |
||
29 | |||
30 | 'extract_parameters_for_action_alias_db', |
||
31 | 'extract_parameters', |
||
32 | 'search_regex_tokens', |
||
33 | ] |
||
34 | |||
35 | |||
36 | LOG = log.getLogger(__name__) |
||
37 | |||
38 | # Python 3 compatibility |
||
39 | if sys.version_info > (3,): |
||
40 | SUBPATTERN_INDEX = 3 |
||
41 | else: |
||
42 | SUBPATTERN_INDEX = 1 |
||
43 | |||
44 | |||
45 | class ActionAliasFormatParser(object): |
||
46 | |||
47 | def __init__(self, alias_format=None, param_stream=None): |
||
48 | self._format = alias_format or '' |
||
49 | self._original_param_stream = param_stream or '' |
||
50 | self._param_stream = self._original_param_stream |
||
51 | self._snippets = self.generate_snippets() |
||
52 | |||
53 | # As there's a lot of questions about using regular expressions, |
||
54 | # I'll try to be thorough when documenting this code. |
||
55 | |||
56 | # 1. Matching the arbitrary key-value pairs at the end of the command |
||
57 | # to support extra parameters (not specified in the format string), |
||
58 | # and cutting them from the command string afterwards. |
||
59 | self._kv_pairs, self._param_stream = self.match_kv_pairs_at_end() |
||
60 | |||
61 | # 2. Matching optional parameters (with default values). |
||
62 | self._optional = self.generate_optional_params_regex() |
||
63 | |||
64 | # 3. Convert the mangled format string into a regex object |
||
65 | self._regex = self.transform_format_string_into_regex() |
||
66 | |||
67 | def generate_snippets(self): |
||
68 | # I'll split the whole convoluted regex into snippets to make it |
||
69 | # a bit more readable (hopefully). |
||
70 | snippets = dict() |
||
71 | |||
72 | # Formats for keys and values: key is a non-spaced string, |
||
73 | # value is anything in quotes or curly braces, or a single word. |
||
74 | snippets['key'] = r'\s*(\S+?)\s*' |
||
75 | snippets['value'] = r'""|\'\'|"(.+?)"|\'(.+?)\'|({.+?})|(\S+)' |
||
76 | |||
77 | # Extended value: also matches unquoted text (caution). |
||
78 | snippets['ext_value'] = r'""|\'\'|"(.+?)"|\'(.+?)\'|({.+?})|(.+?)' |
||
79 | |||
80 | # Key-value pair: |
||
81 | snippets['pairs'] = r'(?:^|\s+){key}=({value})'.format(**snippets) |
||
82 | |||
83 | # End of string: multiple space-separated key-value pairs: |
||
84 | snippets['ending'] = r'.*?(({pairs}\s*)*)$'.format(**snippets) |
||
85 | |||
86 | # Default value in optional parameters: |
||
87 | snippets['default'] = r'\s*=\s*(?:{ext_value})\s*'.format(**snippets) |
||
88 | |||
89 | # Optional parameter (has a default value): |
||
90 | snippets['optional'] = '{{' + snippets['key'] + snippets['default'] + '}}' |
||
91 | |||
92 | # Required parameter (no default value): |
||
93 | snippets['required'] = '{{' + snippets['key'] + '}}' |
||
94 | |||
95 | return snippets |
||
96 | |||
97 | def match_kv_pairs_at_end(self): |
||
98 | param_stream = self._param_stream |
||
99 | |||
100 | # 1. Matching the arbitrary key-value pairs at the end of the command |
||
101 | # to support extra parameters (not specified in the format string), |
||
102 | # and cutting them from the command string afterwards. |
||
103 | ending_pairs = re.match(self._snippets['ending'], param_stream, re.DOTALL) |
||
104 | has_ending_pairs = ending_pairs and ending_pairs.group(1) |
||
105 | if has_ending_pairs: |
||
106 | kv_pairs = re.findall(self._snippets['pairs'], ending_pairs.group(1), re.DOTALL) |
||
107 | param_stream = param_stream.replace(ending_pairs.group(1), '') |
||
108 | else: |
||
109 | kv_pairs = [] |
||
110 | param_stream = " %s " % (param_stream) |
||
111 | |||
112 | return (kv_pairs, param_stream) |
||
113 | |||
114 | def generate_optional_params_regex(self): |
||
115 | # 2. Matching optional parameters (with default values). |
||
116 | return re.findall(self._snippets['optional'], self._format, re.DOTALL) |
||
117 | |||
118 | def transform_format_string_into_regex(self): |
||
119 | # 3. Convert the mangled format string into a regex object |
||
120 | # Transforming our format string into a regular expression, |
||
121 | # substituting {{ ... }} with regex named groups, so that param_stream |
||
122 | # matched against this expression yields a dict of params with values. |
||
123 | param_match = r'\1["\']?(?P<\2>(?:(?<=\').+?(?=\')|(?<=").+?(?=")|{.+?}|.+?))["\']?' |
||
124 | reg = re.sub(r'(\s*)' + self._snippets['optional'], r'(?:' + param_match + r')?', |
||
125 | self._format) |
||
126 | reg = re.sub(r'(\s*)' + self._snippets['required'], param_match, reg) |
||
127 | |||
128 | reg_tokens = parse(reg, flags=re.DOTALL) |
||
129 | |||
130 | # Add a beginning anchor if none exists |
||
131 | if not search_regex_tokens(((AT, AT_BEGINNING), (AT, AT_BEGINNING_STRING)), reg_tokens): |
||
132 | reg = r'^\s*' + reg |
||
133 | |||
134 | # Add an ending anchor if none exists |
||
135 | if not search_regex_tokens(((AT, AT_END), (AT, AT_END_STRING)), reg_tokens, backwards=True): |
||
136 | reg = reg + r'\s*$' |
||
137 | |||
138 | return re.compile(reg, re.DOTALL) |
||
139 | |||
140 | def match_params_in_stream(self, matched_stream): |
||
141 | # 5. Pull out a dictionary of matched groups, apply default parameters and extra parameters |
||
142 | if not matched_stream: |
||
143 | # If no match is found we throw since this indicates provided user string (command) |
||
144 | # didn't match the provided format string |
||
145 | raise ParseException('Command "%s" doesn\'t match format string "%s"' % |
||
146 | (self._original_param_stream, self._format)) |
||
147 | |||
148 | # Compiling results from the steps 1-3. |
||
149 | if matched_stream: |
||
150 | result = matched_stream.groupdict() |
||
151 | |||
152 | # Apply optional parameters/add the default parameters |
||
153 | for param in self._optional: |
||
154 | matched_value = result[param[0]] if matched_stream else None |
||
155 | matched_result = matched_value or ''.join(param[1:]) |
||
156 | if matched_result is not None: |
||
157 | result[param[0]] = matched_result |
||
158 | |||
159 | # Apply given parameters |
||
160 | for pair in self._kv_pairs: |
||
161 | result[pair[0]] = ''.join(pair[2:]) |
||
162 | |||
163 | if self._format and not (self._param_stream.strip() or any(result.values())): |
||
164 | raise ParseException('No value supplied and no default value found.') |
||
165 | |||
166 | return result |
||
167 | |||
168 | def get_extracted_param_value(self): |
||
169 | """ |
||
170 | Match command against the format string and extract parameters from the command string. |
||
171 | |||
172 | :rtype: ``dict`` |
||
173 | """ |
||
174 | # 4. Matching the command against our regex to get the param values |
||
175 | matched_stream = self._regex.search(self._param_stream) |
||
176 | |||
177 | return self.match_params_in_stream(matched_stream) |
||
178 | |||
179 | def get_multiple_extracted_param_value(self): |
||
180 | """ |
||
181 | Match command against the format string and extract parameters from the command string. |
||
182 | |||
183 | :rtype: ``list of dicts`` |
||
184 | """ |
||
185 | # 4. Matching the command against our regex to get the param values |
||
186 | matched_streams = self._regex.finditer(self._param_stream) |
||
187 | |||
188 | results = [] |
||
189 | for matched_stream in matched_streams: |
||
190 | results.append(self.match_params_in_stream(matched_stream)) |
||
191 | return results |
||
192 | |||
193 | |||
194 | def extract_parameters_for_action_alias_db(action_alias_db, format_str, param_stream, |
||
195 | match_multiple=False): |
||
196 | """ |
||
197 | Extract parameters from the user input based on the provided format string. |
||
198 | |||
199 | Note: This function makes sure that the provided format string is indeed available in the |
||
200 | action_alias_db.formats. |
||
201 | """ |
||
202 | formats = [] |
||
203 | formats = action_alias_db.get_format_strings() |
||
204 | |||
205 | if format_str not in formats: |
||
206 | raise ValueError('Format string "%s" is not available on the alias "%s"' % |
||
207 | (format_str, action_alias_db.name)) |
||
208 | |||
209 | result = extract_parameters( |
||
210 | format_str=format_str, |
||
211 | param_stream=param_stream, |
||
212 | match_multiple=match_multiple) |
||
213 | return result |
||
214 | |||
215 | |||
216 | def extract_parameters(format_str, param_stream, match_multiple=False): |
||
217 | parser = ActionAliasFormatParser(alias_format=format_str, param_stream=param_stream) |
||
218 | if match_multiple: |
||
219 | return parser.get_multiple_extracted_param_value() |
||
220 | else: |
||
221 | return parser.get_extracted_param_value() |
||
222 | |||
223 | |||
224 | def search_regex_tokens(needle_tokens, haystack_tokens, backwards=False): |
||
225 | """ |
||
226 | Search a tokenized regex for any tokens in needle_tokens. Returns True if |
||
227 | any token tuple in needle_tokens is found, and False otherwise. |
||
228 | |||
229 | >>> search_regex_tokens(((AT, AT_END), (AT, AT_END)), parse(r'^asdf')) |
||
230 | False |
||
231 | |||
232 | :param needle_tokens: an iterable of token tuples |
||
233 | |||
234 | >>> needle_tokens = ((AT, AT_END), (AT, AT_END)) |
||
235 | >>> search_regex_tokens(needle_tokens, parse(r'^asdf$')) |
||
236 | True |
||
237 | |||
238 | :param haystack_tokens: an iterable of token tuples from sre_parse.parse |
||
239 | |||
240 | >>> regex_tokens = parse(r'^(?:more regex)$') |
||
241 | >>> list(regex_tokens) # doctest: +NORMALIZE_WHITESPACE |
||
242 | [(AT, AT_BEGINNING), |
||
243 | (SUBPATTERN, (None, 0, 0, |
||
244 | [(LITERAL, 109), (LITERAL, 111), (LITERAL, 114), (LITERAL, 101), |
||
245 | (LITERAL, 32), (LITERAL, 114), (LITERAL, 101), (LITERAL, 103), |
||
246 | (LITERAL, 101), (LITERAL, 120)])), (AT, AT_END)] |
||
247 | |||
248 | >>> search_regex_tokens(((AT, AT_END), (AT, AT_END)), regex_tokens) |
||
249 | True |
||
250 | |||
251 | :param backwards: Controls direction of search, defaults to False. |
||
252 | :type backwards: bool or None |
||
253 | |||
254 | .. note:: Set backwards to True if needle_tokens are more likely to be |
||
255 | found at the end of the haystack_tokens iterable, eg: ending anchors. |
||
256 | |||
257 | >>> search_regex_tokens(((AT, AT_END), (AT, AT_END)), parse(r'^asdf$')) |
||
258 | True |
||
259 | >>> search_regex_tokens(((AT, AT_END), (AT, AT_END)), parse(r'^asdf$'), backwards=True) |
||
260 | True |
||
261 | |||
262 | :rtype: ``bool`` |
||
263 | """ |
||
264 | if backwards: |
||
265 | haystack_tokens = reversed(haystack_tokens) |
||
266 | |||
267 | for rtoken_type, rtoken in haystack_tokens: |
||
268 | LOG.debug("Matching: ({}, {})".format(rtoken_type, rtoken)) |
||
269 | if rtoken_type == SUBPATTERN: |
||
270 | LOG.debug("SUBPATTERN: {}".format(rtoken)) |
||
271 | if search_regex_tokens(needle_tokens, rtoken[SUBPATTERN_INDEX]): |
||
272 | return True |
||
273 | elif rtoken_type == BRANCH: |
||
274 | LOG.debug("BRANCH: {}".format(rtoken)) |
||
275 | if search_regex_tokens(needle_tokens, rtoken[1][1]): |
||
276 | return True |
||
277 | elif (rtoken_type, rtoken) in needle_tokens: |
||
278 | LOG.debug("Found: {}".format((rtoken_type, rtoken))) |
||
279 | return True |
||
280 | else: |
||
0 ignored issues
–
show
|
|||
281 | LOG.debug("Not found: {}".format(needle_tokens)) |
||
282 | return False |
||
283 |
If the loop cannot exit early through the use of
break
, theelse
part will always be executed. You can therefore just leave off theelse
.