Passed
Pull Request — master (#480)
by
unknown
04:52
created

kytos.core.pacing.PacerWrapper._localized_key()   A

Complexity

Conditions 1

Size

Total Lines 2
Code Lines 2

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 2
nop 2
dl 0
loc 2
rs 10
c 0
b 0
f 0
1
"""Provides utilities for pacing actions."""
2
import asyncio
3
import logging
4
import time
5
6
import limits.aio.strategies
7
import limits.strategies
8
from limits import RateLimitItem, parse
9
from limits.storage import storage_from_string
10
11
LOG = logging.getLogger(__name__)
12
13
14
available_strategies = {
15
    "fixed_window": (
16
        limits.strategies.FixedWindowRateLimiter,
17
        limits.aio.strategies.FixedWindowRateLimiter,
18
    ),
19
    # "elastic_window": (
20
    #     limits.strategies.FixedWindowElasticExpiryRateLimiter,
21
    #     limits.aio.strategies.FixedWindowElasticExpiryRateLimiter,
22
    # ),
23
}
24
25
26
class Pacer:
27
    """Class for controlling the rate at which actions are executed."""
28
    sync_strategies: dict[str, limits.strategies.RateLimiter]
29
    async_strategies: dict[str, limits.aio.strategies.RateLimiter]
30
    pace_config: dict[str, tuple[str, RateLimitItem]]
31
32
    def __init__(self, storage_uri):
33
        # Initialize dicts
34
        self.sync_strategies = {}
35
        self.async_strategies = {}
36
        self.pace_config = {}
37
38
        # Acquire storage
39
        sync_storage = storage_from_string(storage_uri)
40
        async_storage = storage_from_string(f"async+{storage_uri}")
41
42
        # Populate strategies
43
        for strat_name, strat_pair in available_strategies.items():
44
            sync_strat_type, async_strat_type = strat_pair
45
            self.sync_strategies[strat_name] = sync_strat_type(sync_storage)
46
            self.async_strategies[strat_name] = async_strat_type(async_storage)
47
48
    def inject_config(self, config: dict[str, dict]):
49
        """
50
        Inject settings for pacing
51
        """
52
        # Regenerate update dict
53
        next_config = {
54
            key: (
55
                value.get('strategy', 'fixed_window'),
56
                parse(value['pace'])
57
            )
58
            for key, value in config.items()
59
        }
60
61
        # Validate
62
        for action, (strat, _) in next_config.items():
63
            if strat not in available_strategies:
64
                raise ValueError(
65
                    f"Strategy ({strat}) for action ({action}) not valid"
66
                )
67
            LOG.info("Added pace for action %s", action)
68
69
        # Apply
70
        self.pace_config.update(
71
            next_config
72
        )
73
74
    async def ahit(self, action_name: str, *keys):
75
        """
76
        Asynchronous variant of `hit`.
77
78
        This can be called from the serving thread safely.
79
        """
80
        if action_name not in self.pace_config:
81
            LOG.warning("Pace for `%s` is not set", action_name)
82
            return
83
        strat, pace = self.pace_config[action_name]
84
        identifiers = pace, action_name, *keys
85
        strategy = self.async_strategies[strat]
86
        while not await strategy.hit(*identifiers):
87
            window_reset, _ = await strategy.get_window_stats(
88
                *identifiers
89
            )
90
            sleep_time = window_reset - time.time()
91
92
            await asyncio.sleep(sleep_time)
93
94
    def hit(self, action_name: str, *keys):
95
        """
96
        Pace execution, based on the pacing config for the given `action_name`.
97
        Keys can be included to allow multiple objects
98
        to be be paced separately on the same action.
99
100
        This should not be called from the same thread serving
101
        the pacing.
102
        """
103
        if action_name not in self.pace_config:
104
            LOG.warning("Pace for `%s` is not set", action_name)
105
            return
106
        strat, pace = self.pace_config[action_name]
107
        identifiers = pace, action_name, *keys
108
        strategy = self.sync_strategies[strat]
109
        while not strategy.hit(*identifiers):
110
            window_reset, _ = strategy.get_window_stats(
111
                *identifiers
112
            )
113
            sleep_time = window_reset - time.time()
114
115
            if sleep_time <= 0:
116
                continue
117
118
            time.sleep(sleep_time)
119
120
121
class PacerWrapper:
122
    """
123
    Applies a namespace to various operations related to pacing.
124
    """
125
    namespace: str
126
    pacer: Pacer
127
128
    def __init__(self, namespace: str, pacer: Pacer):
129
        self.namespace = namespace
130
        self.pacer = pacer
131
132
    def inject_config(self, napp_config: dict):
133
        """
134
        Inject namespace specific settings for pacing
135
        """
136
        self.pacer.inject_config(
137
            {
138
                self._localized_key(key): value
139
                for key, value in napp_config.items()
140
            }
141
        )
142
143
    def hit(self, action_name: str, *keys):
144
        """
145
        Asynchronous variant of `hit`.
146
147
        This can be called from the serving thread safely.
148
        """
149
        return self.pacer.hit(
150
            self._localized_key(action_name),
151
            *keys
152
        )
153
154
    async def ahit(self, action_name: str, *keys):
155
        """
156
        Pace execution, based on the pacing config for the given `action_name`.
157
        Keys can be included to allow multiple objects
158
        to be be paced separately on the same action.
159
160
        This should not be called from the same thread serving
161
        the pacing.
162
        """
163
        return await self.pacer.ahit(
164
            self._localized_key(action_name),
165
            *keys
166
        )
167
168
    def _localized_key(self, key):
169
        return f"{self.namespace}.{key}"
170