Test Failed
Push — master ( e380d0...f5671d )
by W
02:58
created

st2common/st2common/util/mongoescape.py (1 issue)

1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
from __future__ import absolute_import
17
import copy
18
import six
19
from six.moves import zip
0 ignored issues
show
Bug Best Practice introduced by
This seems to re-define the built-in zip.

It is generally discouraged to redefine built-ins as this makes code very hard to read.

Loading history...
20
21
# http://docs.mongodb.org/manual/faq/developers/#faq-dollar-sign-escaping
22
UNESCAPED = ['.', '$']
23
ESCAPED = [u'\uFF0E', u'\uFF04']
24
ESCAPE_TRANSLATION = dict(list(zip(UNESCAPED, ESCAPED)))
25
UNESCAPE_TRANSLATION = dict(list(zip(ESCAPED, UNESCAPED)))
26
27
# Note: Because of old rule escaping code, two different characters can be translated back to dot
28
RULE_CRITERIA_UNESCAPED = ['.']
29
RULE_CRITERIA_ESCAPED = [u'\u2024']
30
RULE_CRITERIA_ESCAPE_TRANSLATION = dict(list(zip(RULE_CRITERIA_UNESCAPED,
31
                                            RULE_CRITERIA_ESCAPED)))
32
RULE_CRITERIA_UNESCAPE_TRANSLATION = dict(list(zip(RULE_CRITERIA_ESCAPED,
33
                                              RULE_CRITERIA_UNESCAPED)))
34
35
36
def _prep_work_items(d):
37
    return [(k, v, d) for k, v in six.iteritems(d)]
38
39
40
def _translate_chars(field, translation):
41
    # Only translate the fields of a dict
42
    if not isinstance(field, dict):
43
        return field
44
45
    work_items = _prep_work_items(field)
46
47
    while len(work_items) > 0:
48
        work_item = work_items.pop(0)
49
        oldkey = work_item[0]
50
        value = work_item[1]
51
        work_field = work_item[2]
52
        newkey = oldkey
53
54
        for t_k, t_v in six.iteritems(translation):
55
            newkey = newkey.replace(t_k, t_v)
56
57
        if newkey != oldkey:
58
            work_field[newkey] = value
59
            del work_field[oldkey]
60
61
        if isinstance(value, dict):
62
            work_items.extend(_prep_work_items(value))
63
        elif isinstance(value, list):
64
            for item in value:
65
                if isinstance(item, dict):
66
                    work_items.extend(_prep_work_items(item))
67
68
    return field
69
70
71
def escape_chars(field):
72
    value = copy.deepcopy(field)
73
    return _translate_chars(value, ESCAPE_TRANSLATION)
74
75
76
def unescape_chars(field):
77
    value = copy.deepcopy(field)
78
    translated = _translate_chars(value, UNESCAPE_TRANSLATION)
79
    translated = _translate_chars(value, RULE_CRITERIA_UNESCAPE_TRANSLATION)
80
    return translated
81