Passed
Pull Request — master (#483)
by
unknown
03:55
created

kytos.core.pacing.AsyncEmptyStrategy.hit()   A

Complexity

Conditions 1

Size

Total Lines 13
Code Lines 11

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 11
nop 4
dl 0
loc 13
rs 9.85
c 0
b 0
f 0
1
"""Provides utilities for pacing actions."""
2
import asyncio
3
import logging
4
import time
5
6
import limits.aio.strategies
7
import limits.strategies
8
from limits import RateLimitItem, parse
9
from limits.storage import storage_from_string
10
11
LOG = logging.getLogger(__name__)
12
13
14
class EmptyStrategy(limits.strategies.FixedWindowRateLimiter):
15
    """Rate limiter, that doesn't actually rate limit."""
16
17
    def hit(
18
        self,
19
        item: RateLimitItem,
20
        *identifiers: str,
21
        cost: int = 1
22
    ) -> bool:
23
        self.storage.incr(
24
            item.key_for(*identifiers),
25
            item.get_expiry(),
26
            elastic_expiry=False,
27
            amount=cost,
28
        )
29
        return True
30
31
32
class AsyncEmptyStrategy(limits.aio.strategies.FixedWindowRateLimiter):
33
    """Rate limiter, that doesn't actually rate limit."""
34
35
    async def hit(
36
        self,
37
        item: RateLimitItem,
38
        *identifiers: str,
39
        cost: int = 1
40
    ) -> bool:
41
        await self.storage.incr(
42
            item.key_for(*identifiers),
43
            item.get_expiry(),
44
            elastic_expiry=False,
45
            amount=cost,
46
        )
47
        return True
48
49
50
available_strategies = {
51
    "fixed_window": (
52
        limits.strategies.FixedWindowRateLimiter,
53
        limits.aio.strategies.FixedWindowRateLimiter,
54
    ),
55
    "ignore_pace": (
56
        EmptyStrategy,
57
        AsyncEmptyStrategy,
58
    )
59
    # "elastic_window": (
60
    #     limits.strategies.FixedWindowElasticExpiryRateLimiter,
61
    #     limits.aio.strategies.FixedWindowElasticExpiryRateLimiter,
62
    # ),
63
}
64
65
66
class NoSuchActionError(BaseException):
67
    """
68
    Exception for trying to use actions that aren't configured.
69
70
    Not intended to be caught by NApps.
71
    """
72
73
74
class Pacer:
75
    """Class for controlling the rate at which actions are executed."""
76
    sync_strategies: dict[str, limits.strategies.RateLimiter]
77
    async_strategies: dict[str, limits.aio.strategies.RateLimiter]
78
    pace_config: dict[str, tuple[str, RateLimitItem]]
79
80
    def __init__(self, storage_uri):
81
        # Initialize dicts
82
        self.sync_strategies = {}
83
        self.async_strategies = {}
84
        self.pace_config = {}
85
86
        # Acquire storage
87
        sync_storage = storage_from_string(storage_uri)
88
        async_storage = storage_from_string(f"async+{storage_uri}")
89
90
        # Populate strategies
91
        for strat_name, strat_pair in available_strategies.items():
92
            sync_strat_type, async_strat_type = strat_pair
93
            self.sync_strategies[strat_name] = sync_strat_type(sync_storage)
94
            self.async_strategies[strat_name] = async_strat_type(async_storage)
95
96
    def inject_config(self, config: dict[str, dict]):
97
        """
98
        Inject settings for pacing
99
        """
100
        # Regenerate update dict
101
        next_config = {
102
            key: (
103
                value.get('strategy', 'fixed_window'),
104
                parse(value['pace'])
105
            )
106
            for key, value in config.items()
107
        }
108
109
        # Validate
110
        for action, (strat, _) in next_config.items():
111
            if strat not in available_strategies:
112
                raise ValueError(
113
                    f"Strategy ({strat}) for action ({action}) not valid"
114
                )
115
            LOG.info("Added pace for action %s", action)
116
117
        # Apply
118
        self.pace_config.update(
119
            next_config
120
        )
121
122
    async def ahit(self, action_name: str, *keys):
123
        """
124
        Asynchronous variant of `hit`.
125
126
        This can be called from the serving thread safely.
127
        """
128
        if action_name not in self.pace_config:
129
            raise NoSuchActionError(
130
                f"`{action_name}` has not been configured yet"
131
            )
132
        strat, pace = self.pace_config[action_name]
133
        identifiers = pace, action_name, *keys
134
        strategy = self.async_strategies[strat]
135
        while not await strategy.hit(*identifiers):
136
            window_reset, _ = await strategy.get_window_stats(
137
                *identifiers
138
            )
139
            sleep_time = window_reset - time.time()
140
141
            await asyncio.sleep(sleep_time)
142
143
    def hit(self, action_name: str, *keys):
144
        """
145
        Pace execution, based on the pacing config for the given `action_name`.
146
        Keys can be included to allow multiple objects
147
        to be be paced separately on the same action.
148
149
        This should not be called from the same thread serving
150
        the pacing.
151
        """
152
        if action_name not in self.pace_config:
153
            raise NoSuchActionError(
154
                f"`{action_name}` has not been configured yet"
155
            )
156
        strat, pace = self.pace_config[action_name]
157
        identifiers = pace, action_name, *keys
158
        strategy = self.sync_strategies[strat]
159
        while not strategy.hit(*identifiers):
160
            window_reset, _ = strategy.get_window_stats(
161
                *identifiers
162
            )
163
            sleep_time = window_reset - time.time()
164
165
            if sleep_time <= 0:
166
                continue
167
168
            time.sleep(sleep_time)
169
170
    def is_configured(self, action_name):
171
        """
172
        Check if the given action has been configured.
173
        """
174
        return action_name in self.pace_config
175
176
177
class PacerWrapper:
178
    """
179
    Applies a namespace to various operations related to pacing.
180
    """
181
    namespace: str
182
    pacer: Pacer
183
184
    def __init__(self, namespace: str, pacer: Pacer):
185
        self.namespace = namespace
186
        self.pacer = pacer
187
188
    def inject_config(self, napp_config: dict):
189
        """
190
        Inject namespace specific settings for pacing
191
        """
192
        self.pacer.inject_config(
193
            {
194
                self._localized_key(key): value
195
                for key, value in napp_config.items()
196
            }
197
        )
198
199
    def hit(self, action_name: str, *keys):
200
        """
201
        Asynchronous variant of `hit`.
202
203
        This can be called from the serving thread safely.
204
        """
205
        return self.pacer.hit(
206
            self._localized_key(action_name),
207
            *keys
208
        )
209
210
    async def ahit(self, action_name: str, *keys):
211
        """
212
        Pace execution, based on the pacing config for the given `action_name`.
213
        Keys can be included to allow multiple objects
214
        to be be paced separately on the same action.
215
216
        This should not be called from the same thread serving
217
        the pacing.
218
        """
219
        return await self.pacer.ahit(
220
            self._localized_key(action_name),
221
            *keys
222
        )
223
224
    def is_configured(self, action_name: str):
225
        """
226
        Check if the given action has been configured.
227
        """
228
        return self.pacer.is_configured(
229
            self._localized_key(action_name)
230
        )
231
232
    def _localized_key(self, key):
233
        return f"{self.namespace}.{key}"
234