Passed
Pull Request — master (#480)
by
unknown
04:59 queued 38s
created

kytos.core.pacing.PacerWrapper.ahit()   A

Complexity

Conditions 1

Size

Total Lines 12
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 4
nop 3
dl 0
loc 12
rs 10
c 0
b 0
f 0
1
"""Provides utilities for pacing actions."""
2
import asyncio
3
import logging
4
import time
5
6
import limits.aio.strategies
7
import limits.strategies
8
from limits import RateLimitItem, parse
9
from limits.storage import storage_from_string
10
11
LOG = logging.getLogger(__name__)
12
13
14
available_strategies = {
15
    "fixed_window": (
16
        limits.strategies.FixedWindowRateLimiter,
17
        limits.aio.strategies.FixedWindowRateLimiter,
18
    ),
19
    # "elastic_window": (
20
    #     limits.strategies.FixedWindowElasticExpiryRateLimiter,
21
    #     limits.aio.strategies.FixedWindowElasticExpiryRateLimiter,
22
    # ),
23
}
24
25
26
class NoSuchActionError(BaseException):
27
    """
28
    Exception for trying to use actions that aren't configured.
29
30
    Not intended to be caught by NApps.
31
    """
32
33
34
class Pacer:
35
    """Class for controlling the rate at which actions are executed."""
36
    sync_strategies: dict[str, limits.strategies.RateLimiter]
37
    async_strategies: dict[str, limits.aio.strategies.RateLimiter]
38
    pace_config: dict[str, tuple[str, RateLimitItem]]
39
40
    def __init__(self, storage_uri):
41
        # Initialize dicts
42
        self.sync_strategies = {}
43
        self.async_strategies = {}
44
        self.pace_config = {}
45
46
        # Acquire storage
47
        sync_storage = storage_from_string(storage_uri)
48
        async_storage = storage_from_string(f"async+{storage_uri}")
49
50
        # Populate strategies
51
        for strat_name, strat_pair in available_strategies.items():
52
            sync_strat_type, async_strat_type = strat_pair
53
            self.sync_strategies[strat_name] = sync_strat_type(sync_storage)
54
            self.async_strategies[strat_name] = async_strat_type(async_storage)
55
56
    def inject_config(self, config: dict[str, dict]):
57
        """
58
        Inject settings for pacing
59
        """
60
        # Regenerate update dict
61
        next_config = {
62
            key: (
63
                value.get('strategy', 'fixed_window'),
64
                parse(value['pace'])
65
            )
66
            for key, value in config.items()
67
        }
68
69
        # Validate
70
        for action, (strat, _) in next_config.items():
71
            if strat not in available_strategies:
72
                raise ValueError(
73
                    f"Strategy ({strat}) for action ({action}) not valid"
74
                )
75
            LOG.info("Added pace for action %s", action)
76
77
        # Apply
78
        self.pace_config.update(
79
            next_config
80
        )
81
82
    async def ahit(self, action_name: str, *keys):
83
        """
84
        Asynchronous variant of `hit`.
85
86
        This can be called from the serving thread safely.
87
        """
88
        if action_name not in self.pace_config:
89
            raise NoSuchActionError(
90
                f"`{action_name}` has not been configured yet"
91
            )
92
        strat, pace = self.pace_config[action_name]
93
        identifiers = pace, action_name, *keys
94
        strategy = self.async_strategies[strat]
95
        while not await strategy.hit(*identifiers):
96
            window_reset, _ = await strategy.get_window_stats(
97
                *identifiers
98
            )
99
            sleep_time = window_reset - time.time()
100
101
            await asyncio.sleep(sleep_time)
102
103
    def hit(self, action_name: str, *keys):
104
        """
105
        Pace execution, based on the pacing config for the given `action_name`.
106
        Keys can be included to allow multiple objects
107
        to be be paced separately on the same action.
108
109
        This should not be called from the same thread serving
110
        the pacing.
111
        """
112
        if action_name not in self.pace_config:
113
            raise NoSuchActionError(
114
                f"`{action_name}` has not been configured yet"
115
            )
116
        strat, pace = self.pace_config[action_name]
117
        identifiers = pace, action_name, *keys
118
        strategy = self.sync_strategies[strat]
119
        while not strategy.hit(*identifiers):
120
            window_reset, _ = strategy.get_window_stats(
121
                *identifiers
122
            )
123
            sleep_time = window_reset - time.time()
124
125
            if sleep_time <= 0:
126
                continue
127
128
            time.sleep(sleep_time)
129
130
131
class PacerWrapper:
132
    """
133
    Applies a namespace to various operations related to pacing.
134
    """
135
    namespace: str
136
    pacer: Pacer
137
138
    def __init__(self, namespace: str, pacer: Pacer):
139
        self.namespace = namespace
140
        self.pacer = pacer
141
142
    def inject_config(self, napp_config: dict):
143
        """
144
        Inject namespace specific settings for pacing
145
        """
146
        self.pacer.inject_config(
147
            {
148
                self._localized_key(key): value
149
                for key, value in napp_config.items()
150
            }
151
        )
152
153
    def hit(self, action_name: str, *keys):
154
        """
155
        Asynchronous variant of `hit`.
156
157
        This can be called from the serving thread safely.
158
        """
159
        return self.pacer.hit(
160
            self._localized_key(action_name),
161
            *keys
162
        )
163
164
    async def ahit(self, action_name: str, *keys):
165
        """
166
        Pace execution, based on the pacing config for the given `action_name`.
167
        Keys can be included to allow multiple objects
168
        to be be paced separately on the same action.
169
170
        This should not be called from the same thread serving
171
        the pacing.
172
        """
173
        return await self.pacer.ahit(
174
            self._localized_key(action_name),
175
            *keys
176
        )
177
178
    def _localized_key(self, key):
179
        return f"{self.namespace}.{key}"
180