Test Failed
Pull Request — master (#480)
by
unknown
09:10
created

kytos.core.pacing.PacerWrapper.ahit()   A

Complexity

Conditions 1

Size

Total Lines 12
Code Lines 4

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 1
eloc 4
nop 3
dl 0
loc 12
rs 10
c 0
b 0
f 0
1
"""Provides utilities for pacing actions."""
2
import asyncio
3
import logging
4
import time
5
from dataclasses import dataclass
6
7
import limits.aio.strategies
8
import limits.strategies
9
from limits import RateLimitItem, parse
10
from limits.storage import storage_from_string
11
12
LOG = logging.getLogger(__name__)
13
14
15
available_strategies = {
16
    "fixed_window": (
17
        limits.strategies.FixedWindowRateLimiter,
18
        limits.aio.strategies.FixedWindowRateLimiter,
19
    ),
20
    "elastic_window": (
21
        limits.strategies.FixedWindowElasticExpiryRateLimiter,
22
        limits.aio.strategies.FixedWindowElasticExpiryRateLimiter,
23
    ),
24
}
25
26
27
class Pacer:
28
    """Class for controlling the rate at which actions are executed."""
29
    sync_strategies: dict[str, limits.strategies.RateLimiter]
30
    async_strategies: dict[str, limits.aio.strategies.RateLimiter]
31
    pace_config: dict[str, tuple[str, RateLimitItem]]
32
33
    def __init__(self, storage_uri):
34
        # Initialize dicts
35
        self.sync_strategies = {}
36
        self.async_strategies = {}
37
        self.pace_config = {}
38
39
        # Acquire storage
40
        sync_storage = storage_from_string(storage_uri)
41
        async_storage = storage_from_string(f"async+{storage_uri}")
42
43
        # Populate strategies
44
        for strat_name, strat_pair in available_strategies.items():
45
            sync_strat_type, async_strat_type = strat_pair
46
            self.sync_strategies[strat_name] = sync_strat_type(sync_storage)
47
            self.async_strategies[strat_name] = async_strat_type(async_storage)
48
49
    def inject_config(self, config: dict[str, dict]):
50
        """
51
        Inject settings for pacing
52
        """
53
        # Regenerate update dict
54
        next_config = {
55
            key: (
56
                value.get('strategy', 'fixed_window'),
57
                parse(value['pace'])
58
            )
59
            for key, value in config.items()
60
        }
61
62
        # Validate
63
        for action, (strat, _) in next_config.items():
64
            LOG.info("Added pace for action %s", action)
65
            if strat not in available_strategies:
66
                raise ValueError(
67
                    f"Strategy ({strat}) for action ({action}) not valid"
68
                )
69
70
        # Apply
71
        self.pace_config.update(
72
            next_config
73
        )
74
75
    async def ahit(self, action_name, *keys):
76
        """
77
        Asynchronous variant of `hit`.
78
79
        This can be called from the serving thread safely.
80
        """
81
        if action_name not in self.pace_config:
82
            LOG.warning("Pace for `%s` is not set", action_name)
83
            return
84
        strat, pace = self.pace_config[action_name]
85
        identifiers = pace, action_name, *keys
86
        strategy = self.async_strategies[strat]
87
        while not await strategy.hit(*identifiers):
88
            window_reset, _ = await strategy.get_window_stats(
89
                *identifiers
90
            )
91
            sleep_time = window_reset - time.time()
92
93
            await asyncio.sleep(sleep_time)
94
95
    def hit(self, action_name, *keys):
96
        """
97
        Pace execution, based on the pacing config for the given `action_name`.
98
        Keys can be included to allow multiple objects
99
        to be be paced separately on the same action.
100
101
        This should not be called from the same thread serving
102
        the pacing.
103
        """
104
        if action_name not in self.pace_config:
105
            LOG.warning("Pace for `%s` is not set", action_name)
106
            return
107
        strat, pace = self.pace_config[action_name]
108
        identifiers = pace, action_name, *keys
109
        strategy = self.sync_strategies[strat]
110
        while not strategy.hit(*identifiers):
111
            window_reset, _ = strategy.get_window_stats(
112
                *identifiers
113
            )
114
            sleep_time = window_reset - time.time()
115
116
            if sleep_time <= 0:
117
                continue
118
119
            time.sleep(sleep_time)
120
121
122
@dataclass
123
class PacerWrapper:
124
    """
125
    Applies a namespace to various operations related to pacing.
126
    """
127
    namespace: str
128
    pacer: Pacer
129
130
    def inject_config(self, napp_config: dict):
131
        """
132
        Inject namespace specific settings for pacing
133
        """
134
        self.pacer.inject_config(
135
            {
136
                self._localized_key(key): value
137
                for key, value in napp_config.items()
138
            }
139
        )
140
141
    def hit(self, action_name, *keys):
142
        """
143
        Asynchronous variant of `hit`.
144
145
        This can be called from the serving thread safely.
146
        """
147
        return self.pacer.hit(
148
            self._localized_key(action_name),
149
            *keys
150
        )
151
152
    async def ahit(self, action_name, *keys):
153
        """
154
        Pace execution, based on the pacing config for the given `action_name`.
155
        Keys can be included to allow multiple objects
156
        to be be paced separately on the same action.
157
158
        This should not be called from the same thread serving
159
        the pacing.
160
        """
161
        return await self.pacer.ahit(
162
            self._localized_key(action_name),
163
            *keys
164
        )
165
166
    def _localized_key(self, key):
167
        return f"{self.namespace}.{key}"
168