Passed
Push — master ( 3c577f...76a366 )
by
unknown
03:07 queued 19s
created

kytos.core.pacing   A

Complexity

Total Complexity 21

Size/Duplication

Total Lines 236
Duplicated Lines 0 %

Importance

Changes 0
Metric Value
eloc 123
dl 0
loc 236
rs 10
c 0
b 0
f 0
wmc 21

13 Methods

Rating   Name   Duplication   Size   Complexity  
A Pacer.inject_config() 0 24 3
A PacerWrapper.is_configured() 0 6 1
A AsyncEmptyStrategy.hit() 0 14 1
A Pacer.is_configured() 0 5 1
A PacerWrapper.inject_config() 0 8 1
A PacerWrapper.ahit() 0 12 1
A Pacer.__init__() 0 15 2
A Pacer.ahit() 0 20 3
A PacerWrapper._localized_key() 0 2 1
A Pacer.hit() 0 26 4
A PacerWrapper.hit() 0 9 1
A EmptyStrategy.hit() 0 14 1
A PacerWrapper.__init__() 0 3 1
1
"""Provides utilities for pacing actions."""
2
import asyncio
3
import logging
4
import time
5
6
import limits.aio.strategies
7
import limits.strategies
8
from limits import RateLimitItem, parse
9
from limits.storage import storage_from_string
10
11
LOG = logging.getLogger(__name__)
12
13
14
class EmptyStrategy(limits.strategies.FixedWindowRateLimiter):
15
    """Rate limiter, that doesn't actually rate limit."""
16
17
    def hit(
18
        self,
19
        item: RateLimitItem,
20
        *identifiers: str,
21
        cost: int = 1
22
    ) -> bool:
23
        # Increment storage, to collect data on usage rate of actions
24
        self.storage.incr(
25
            item.key_for(*identifiers),
26
            item.get_expiry(),
27
            elastic_expiry=False,
28
            amount=cost,
29
        )
30
        return True
31
32
33
class AsyncEmptyStrategy(limits.aio.strategies.FixedWindowRateLimiter):
34
    """Rate limiter, that doesn't actually rate limit."""
35
36
    async def hit(
37
        self,
38
        item: RateLimitItem,
39
        *identifiers: str,
40
        cost: int = 1
41
    ) -> bool:
42
        # Increment storage, to collect data on usage rate of actions
43
        await self.storage.incr(
44
            item.key_for(*identifiers),
45
            item.get_expiry(),
46
            elastic_expiry=False,
47
            amount=cost,
48
        )
49
        return True
50
51
52
available_strategies = {
53
    "fixed_window": (
54
        limits.strategies.FixedWindowRateLimiter,
55
        limits.aio.strategies.FixedWindowRateLimiter,
56
    ),
57
    "ignore_pace": (
58
        EmptyStrategy,
59
        AsyncEmptyStrategy,
60
    )
61
    # "elastic_window": (
62
    #     limits.strategies.FixedWindowElasticExpiryRateLimiter,
63
    #     limits.aio.strategies.FixedWindowElasticExpiryRateLimiter,
64
    # ),
65
}
66
67
68
class NoSuchActionError(BaseException):
69
    """
70
    Exception for trying to use actions that aren't configured.
71
72
    Not intended to be caught by NApps.
73
    """
74
75
76
class Pacer:
77
    """Class for controlling the rate at which actions are executed."""
78
    sync_strategies: dict[str, limits.strategies.RateLimiter]
79
    async_strategies: dict[str, limits.aio.strategies.RateLimiter]
80
    pace_config: dict[str, tuple[str, RateLimitItem]]
81
82
    def __init__(self, storage_uri):
83
        # Initialize dicts
84
        self.sync_strategies = {}
85
        self.async_strategies = {}
86
        self.pace_config = {}
87
88
        # Acquire storage
89
        sync_storage = storage_from_string(storage_uri)
90
        async_storage = storage_from_string(f"async+{storage_uri}")
91
92
        # Populate strategies
93
        for strat_name, strat_pair in available_strategies.items():
94
            sync_strat_type, async_strat_type = strat_pair
95
            self.sync_strategies[strat_name] = sync_strat_type(sync_storage)
96
            self.async_strategies[strat_name] = async_strat_type(async_storage)
97
98
    def inject_config(self, config: dict[str, dict]):
99
        """
100
        Inject settings for pacing
101
        """
102
        # Regenerate update dict
103
        next_config = {
104
            key: (
105
                value.get('strategy', 'fixed_window'),
106
                parse(value['pace'])
107
            )
108
            for key, value in config.items()
109
        }
110
111
        # Validate
112
        for action, (strat, _) in next_config.items():
113
            if strat not in available_strategies:
114
                raise ValueError(
115
                    f"Strategy ({strat}) for action ({action}) not valid"
116
                )
117
            LOG.info("Added pace for action %s", action)
118
119
        # Apply
120
        self.pace_config.update(
121
            next_config
122
        )
123
124
    async def ahit(self, action_name: str, *keys):
125
        """
126
        Asynchronous variant of `hit`.
127
128
        This can be called from the serving thread safely.
129
        """
130
        if action_name not in self.pace_config:
131
            raise NoSuchActionError(
132
                f"`{action_name}` has not been configured yet"
133
            )
134
        strat, pace = self.pace_config[action_name]
135
        identifiers = pace, action_name, *keys
136
        strategy = self.async_strategies[strat]
137
        while not await strategy.hit(*identifiers):
138
            window_reset, _ = await strategy.get_window_stats(
139
                *identifiers
140
            )
141
            sleep_time = window_reset - time.time()
142
            LOG.info(f'Limited reached: {identifiers}')
143
            await asyncio.sleep(sleep_time)
144
145
    def hit(self, action_name: str, *keys):
146
        """
147
        Pace execution, based on the pacing config for the given `action_name`.
148
        Keys can be included to allow multiple objects
149
        to be be paced separately on the same action.
150
151
        This should not be called from the same thread serving
152
        the pacing.
153
        """
154
        if action_name not in self.pace_config:
155
            raise NoSuchActionError(
156
                f"`{action_name}` has not been configured yet"
157
            )
158
        strat, pace = self.pace_config[action_name]
159
        identifiers = pace, action_name, *keys
160
        strategy = self.sync_strategies[strat]
161
        while not strategy.hit(*identifiers):
162
            window_reset, _ = strategy.get_window_stats(
163
                *identifiers
164
            )
165
            sleep_time = window_reset - time.time()
166
            LOG.info(f'Limited reached: {identifiers}')
167
            if sleep_time <= 0:
168
                continue
169
170
            time.sleep(sleep_time)
171
172
    def is_configured(self, action_name):
173
        """
174
        Check if the given action has been configured.
175
        """
176
        return action_name in self.pace_config
177
178
179
class PacerWrapper:
180
    """
181
    Applies a namespace to various operations related to pacing.
182
    """
183
    namespace: str
184
    pacer: Pacer
185
186
    def __init__(self, namespace: str, pacer: Pacer):
187
        self.namespace = namespace
188
        self.pacer = pacer
189
190
    def inject_config(self, napp_config: dict):
191
        """
192
        Inject namespace specific settings for pacing
193
        """
194
        self.pacer.inject_config(
195
            {
196
                self._localized_key(key): value
197
                for key, value in napp_config.items()
198
            }
199
        )
200
201
    def hit(self, action_name: str, *keys):
202
        """
203
        Asynchronous variant of `hit`.
204
205
        This can be called from the serving thread safely.
206
        """
207
        return self.pacer.hit(
208
            self._localized_key(action_name),
209
            *keys
210
        )
211
212
    async def ahit(self, action_name: str, *keys):
213
        """
214
        Pace execution, based on the pacing config for the given `action_name`.
215
        Keys can be included to allow multiple objects
216
        to be be paced separately on the same action.
217
218
        This should not be called from the same thread serving
219
        the pacing.
220
        """
221
        return await self.pacer.ahit(
222
            self._localized_key(action_name),
223
            *keys
224
        )
225
226
    def is_configured(self, action_name: str):
227
        """
228
        Check if the given action has been configured.
229
        """
230
        return self.pacer.is_configured(
231
            self._localized_key(action_name)
232
        )
233
234
    def _localized_key(self, key):
235
        return f"{self.namespace}.{key}"
236