Completed
Push — master ( be542d...29973a )
by Arma
08:49 queued 01:31
created

st2reactor.container.Range   A

Complexity

Total Complexity 11

Size/Duplication

Total Lines 41
Duplicated Lines 0 %
Metric Value
wmc 11
dl 0
loc 41
rs 10
1
# Licensed to the StackStorm, Inc ('StackStorm') under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#     http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15
16
import ctypes
17
import hashlib
18
19
from st2reactor.container.partitioners import DefaultPartitioner, get_all_enabled_sensors
20
21
__all__ = [
22
    'HashPartitioner',
23
    'Range'
24
]
25
26
# The range expression serialized is of the form `RANGE_START..RANGE_END|RANGE_START..RANGE_END ...`
27
SUB_RANGE_SEPARATOR = '|'
28
RANGE_BOUNDARY_SEPARATOR = '..'
29
30
31
class Range(object):
32
33
    RANGE_MIN_ENUM = 'min'
34
    RANGE_MIN_VALUE = 0
35
36
    RANGE_MAX_ENUM = 'max'
37
    RANGE_MAX_VALUE = 2**32
38
39
    def __init__(self, range_repr):
40
        self.range_start, self.range_end = self._get_range_boundaries(range_repr)
41
42
    def __contains__(self, item):
43
        return item >= self.range_start and item < self.range_end
44
45
    def _get_range_boundaries(self, range_repr):
46
        range_repr = [value.strip() for value in range_repr.split(RANGE_BOUNDARY_SEPARATOR)]
47
        if len(range_repr) != 2:
48
            raise ValueError('Unsupported sub-range format %s.' % range_repr)
49
50
        range_start = self._get_valid_range_boundary(range_repr[0])
51
        range_end = self._get_valid_range_boundary(range_repr[1])
52
53
        if range_start > range_end:
54
            raise ValueError('Misconfigured range [%d..%d]' % (range_start, range_end))
55
        return (range_start, range_end)
56
57
    def _get_valid_range_boundary(self, boundary_value):
58
        # Not elegant by any means but super clear.
59
        if boundary_value.lower() == self.RANGE_MIN_ENUM:
60
            return self.RANGE_MIN_VALUE
61
        if boundary_value.lower() == self.RANGE_MAX_ENUM:
62
            return self.RANGE_MAX_VALUE
63
        boundary_value = int(boundary_value)
64
        # Disallow any value less than the RANGE_MIN_VALUE or more than RANGE_MAX_VALUE.
65
        # Decided against raising a ValueError as it is manageable. Should not lead to
66
        # unexpected behavior.
67
        if boundary_value < self.RANGE_MIN_VALUE:
68
            return self.RANGE_MIN_VALUE
69
        if boundary_value > self.RANGE_MAX_VALUE:
70
            return self.RANGE_MAX_VALUE
71
        return boundary_value
72
73
74
class HashPartitioner(DefaultPartitioner):
75
76
    def __init__(self, sensor_node_name, hash_ranges):
77
        super(HashPartitioner, self).__init__(sensor_node_name=sensor_node_name)
78
        self._hash_ranges = self._create_hash_ranges(hash_ranges)
79
80
    def is_sensor_owner(self, sensor_db):
81
        return self._is_in_hash_range(sensor_db.get_reference().ref)
82
83
    def get_sensors(self):
84
        all_enabled_sensors = get_all_enabled_sensors()
85
86
        partition_members = []
87
88
        for sensor in all_enabled_sensors:
89
            sensor_ref = sensor.get_reference()
90
            if self._is_in_hash_range(sensor_ref.ref):
91
                partition_members.append(sensor)
92
93
        return partition_members
94
95
    def _is_in_hash_range(self, sensor_ref):
96
        sensor_ref_hash = self._hash_sensor_ref(sensor_ref)
97
        for hash_range in self._hash_ranges:
98
            if sensor_ref_hash in hash_range:
99
                return True
100
        return False
101
102
    def _hash_sensor_ref(self, sensor_ref):
103
        # Hmm... maybe this should be done in C. If it becomes a performance
104
        # bottleneck will look at that optimization.
105
106
        # From http://www.cs.hmc.edu/~geoff/classes/hmc.cs070.200101/homework10/hashfuncs.html
107
        # The 'liberal' use of ctypes.c_unit is to guarantee unsigned integer and workaround
108
        # inifinite precision.
109
        md5_hash = hashlib.md5(sensor_ref.encode())
110
        md5_hash_int_repr = int(md5_hash.hexdigest(), 16)
111
        h = ctypes.c_uint(0)
112
        for d in reversed(str(md5_hash_int_repr)):
113
            d = ctypes.c_uint(int(d))
114
            higherorder = ctypes.c_uint(h.value & 0xf8000000)
115
            h = ctypes.c_uint(h.value << 5)
116
            h = ctypes.c_uint(h.value ^ (higherorder.value >> 27))
117
            h = ctypes.c_uint(h.value ^ d.value)
118
        return h.value
119
120
    def _create_hash_ranges(self, hash_ranges_repr):
121
        """
122
        Extract from a format like - 0..1024|2048..4096|4096..MAX
123
        """
124
        hash_ranges = []
125
        # Likely all this splitting can be avoided and done nicely with regex but I generally
126
        # dislike using regex so I go with naive approaches.
127
        for range_repr in hash_ranges_repr.split(SUB_RANGE_SEPARATOR):
128
            hash_range = Range(range_repr.strip())
129
            hash_ranges.append(hash_range)
130
        return hash_ranges
131