Passed
Push — master ( a2290b...20e27f )
by
unknown
03:52
created

search_regex_tokens()   C

Complexity

Conditions 9

Size

Total Lines 59

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 1
Metric Value
cc 9
c 1
b 0
f 1
dl 0
loc 59
rs 5.0372

How to fix   Long Method   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import re
17
import sys
18
from sre_parse import (
19
    parse, AT, AT_BEGINNING, AT_BEGINNING_STRING, AT_END, AT_END_STRING,
20
    BRANCH, SUBPATTERN,
21
)
22
23
from st2common.exceptions.content import ParseException
24
from st2common import log
25
26
__all__ = [
27
    'ActionAliasFormatParser',
28
29
    'extract_parameters_for_action_alias_db',
30
    'extract_parameters',
31
    'search_regex_tokens',
32
]
33
34
35
LOG = log.getLogger(__name__)
36
37
# Python 3 compatibility
38
if sys.version_info > (3,):
39
    SUBPATTERN_INDEX = 3
40
else:
41
    SUBPATTERN_INDEX = 1
42
43
44
class ActionAliasFormatParser(object):
45
46
    def __init__(self, alias_format=None, param_stream=None):
47
        self._format = alias_format or ''
48
        self._param_stream = param_stream or ''
49
50
    def get_extracted_param_value(self):
51
        """
52
        Match command against the format string and extract paramters from the command string.
53
54
        :rtype: ``dict``
55
        """
56
        result = {}
57
58
        param_stream = self._param_stream
59
60
        # As there's a lot of questions about using regular expressions,
61
        # I'll try to be thorough when documenting this code.
62
63
        # I'll split the whole convoluted regex into snippets to make it
64
        # a bit more readable (hopefully).
65
        snippets = dict()
66
67
        # Formats for keys and values: key is a non-spaced string,
68
        # value is anything in quotes or curly braces, or a single word.
69
        snippets['key'] = r'\s*(\S+?)\s*'
70
        snippets['value'] = r'""|\'\'|"(.+?)"|\'(.+?)\'|({.+?})|(\S+)'
71
72
        # Extended value: also matches unquoted text (caution).
73
        snippets['ext_value'] = r'""|\'\'|"(.+?)"|\'(.+?)\'|({.+?})|(.+?)'
74
75
        # Key-value pair:
76
        snippets['pairs'] = r'(?:^|\s+){key}=({value})'.format(**snippets)
77
78
        # End of string: multiple space-separated key-value pairs:
79
        snippets['ending'] = r'.*?(({pairs}\s*)*)$'.format(**snippets)
80
81
        # Default value in optional parameters:
82
        snippets['default'] = r'\s*=\s*(?:{ext_value})\s*'.format(**snippets)
83
84
        # Optional parameter (has a default value):
85
        snippets['optional'] = '{{' + snippets['key'] + snippets['default'] + '}}'
86
87
        # Required parameter (no default value):
88
        snippets['required'] = '{{' + snippets['key'] + '}}'
89
90
        # 1. Matching the arbitrary key-value pairs at the end of the command
91
        # to support extra parameters (not specified in the format string),
92
        # and cutting them from the command string afterwards.
93
        ending_pairs = re.match(snippets['ending'], param_stream, re.DOTALL)
94
        has_ending_pairs = ending_pairs and ending_pairs.group(1)
95
        if has_ending_pairs:
96
            kv_pairs = re.findall(snippets['pairs'], ending_pairs.group(1), re.DOTALL)
97
            param_stream = param_stream.replace(ending_pairs.group(1), '')
98
        param_stream = " %s " % (param_stream)
99
100
        # 2. Matching optional parameters (with default values).
101
        optional = re.findall(snippets['optional'], self._format, re.DOTALL)
102
103
        # Transforming our format string into a regular expression,
104
        # substituting {{ ... }} with regex named groups, so that param_stream
105
        # matched against this expression yields a dict of params with values.
106
        param_match = r'\1["\']?(?P<\2>(?:(?<=\').+?(?=\')|(?<=").+?(?=")|{.+?}|.+?))["\']?'
107
        reg = re.sub(r'(\s*)' + snippets['optional'], r'(?:' + param_match + r')?', self._format)
108
        reg = re.sub(r'(\s*)' + snippets['required'], param_match, reg)
109
110
        reg_tokens = parse(reg, flags=re.DOTALL)
111
112
        # Add a beginning anchor if none exists
113
        if not search_regex_tokens(((AT, AT_BEGINNING), (AT, AT_BEGINNING_STRING)), reg_tokens):
114
            reg = r'^\s*' + reg
115
116
        # Add an ending anchor if none exists
117
        if not search_regex_tokens(((AT, AT_END), (AT, AT_END_STRING)), reg_tokens, backwards=True):
118
            reg = reg + r'\s*$'
119
120
        # 3. Matching the command against our regex to get the param values
121
        matched_stream = re.match(reg, param_stream, re.DOTALL)
122
123
        if not matched_stream:
124
            # If no match is found we throw since this indicates provided user string (command)
125
            # didn't match the provided format string
126
            raise ParseException('Command "%s" doesn\'t match format string "%s"' %
127
                                 (self._param_stream, self._format))
128
129
        # Compiling results from the steps 1-3.
130
        if matched_stream:
131
            result = matched_stream.groupdict()
132
133
        for param in optional:
134
            matched_value = result[param[0]] if matched_stream else None
135
            matched_result = matched_value or ''.join(param[1:])
136
            if matched_result is not None:
137
                result[param[0]] = matched_result
138
139
        if has_ending_pairs:
140
            for pair in kv_pairs:
141
                result[pair[0]] = ''.join(pair[2:])
142
143
        if self._format and not (self._param_stream.strip() or any(result.values())):
144
            raise ParseException('No value supplied and no default value found.')
145
146
        return result
147
148
149
def extract_parameters_for_action_alias_db(action_alias_db, format_str, param_stream):
150
    """
151
    Extract parameters from the user input based on the provided format string.
152
153
    Note: This function makes sure that the provided format string is indeed available in the
154
    action_alias_db.formats.
155
    """
156
    formats = []
157
    formats = action_alias_db.get_format_strings()
158
159
    if format_str not in formats:
160
        raise ValueError('Format string "%s" is not available on the alias "%s"' %
161
                         (format_str, action_alias_db.name))
162
163
    result = extract_parameters(format_str=format_str, param_stream=param_stream)
164
    return result
165
166
167
def extract_parameters(format_str, param_stream):
168
    parser = ActionAliasFormatParser(alias_format=format_str, param_stream=param_stream)
169
    return parser.get_extracted_param_value()
170
171
172
def search_regex_tokens(needle_tokens, haystack_tokens, backwards=False):
173
    """
174
    Search a tokenized regex for any tokens in needle_tokens. Returns True if
175
    any token tuple in needle_tokens is found, and False otherwise.
176
177
    >>> search_regex_tokens(((AT, AT_END), (AT, AT_END)), parse(r'^asdf'))
178
    False
179
180
    :param needle_tokens: an iterable of token tuples
181
182
    >>> needle_tokens = ((AT, AT_END), (AT, AT_END))
183
    >>> search_regex_tokens(needle_tokens, parse(r'^asdf$'))
184
    True
185
186
    :param haystack_tokens: an iterable of token tuples from sre_parse.parse
187
188
    >>> regex_tokens = parse(r'^(?:more regex)$')
189
    >>> list(regex_tokens)  # doctest: +NORMALIZE_WHITESPACE
190
    [(AT, AT_BEGINNING),
191
     (SUBPATTERN, (None, 0, 0,
192
     [(LITERAL, 109), (LITERAL, 111), (LITERAL, 114), (LITERAL, 101),
193
      (LITERAL, 32), (LITERAL, 114), (LITERAL, 101), (LITERAL, 103),
194
      (LITERAL, 101), (LITERAL, 120)])), (AT, AT_END)]
195
196
    >>> search_regex_tokens(((AT, AT_END), (AT, AT_END)), regex_tokens)
197
    True
198
199
    :param backwards: Controls direction of search, defaults to False.
200
    :type backwards: bool or None
201
202
    .. note:: Set backwards to True if needle_tokens are more likely to be
203
    found at the end of the haystack_tokens iterable, eg: ending anchors.
204
205
    >>> search_regex_tokens(((AT, AT_END), (AT, AT_END)), parse(r'^asdf$'))
206
    True
207
    >>> search_regex_tokens(((AT, AT_END), (AT, AT_END)), parse(r'^asdf$'), backwards=True)
208
    True
209
210
    :rtype: ``bool``
211
    """
212
    if backwards:
213
        haystack_tokens = reversed(haystack_tokens)
214
215
    for rtoken_type, rtoken in haystack_tokens:
216
        LOG.debug("Matching: ({}, {})".format(rtoken_type, rtoken))
217
        if rtoken_type == SUBPATTERN:
218
            LOG.debug("SUBPATTERN: {}".format(rtoken))
219
            if search_regex_tokens(needle_tokens, rtoken[SUBPATTERN_INDEX]):
220
                return True
221
        elif rtoken_type == BRANCH:
222
            LOG.debug("BRANCH: {}".format(rtoken))
223
            if search_regex_tokens(needle_tokens, rtoken[1][1]):
224
                return True
225
        elif (rtoken_type, rtoken) in needle_tokens:
226
            LOG.debug("Found: {}".format((rtoken_type, rtoken)))
227
            return True
228
    else:
0 ignored issues
show
Bug introduced by
The else clause is not necessary as the loop does not contain a break statement.

If the loop cannot exit early through the use of break, the else part will always be executed. You can therefore just leave off the else.

Loading history...
229
        LOG.debug("Not found: {}".format(needle_tokens))
230
        return False
231