| Total Complexity | 4 |
| Total Lines | 31 |
| Duplicated Lines | 0 % |
| Changes | 0 | ||
| 1 | """ |
||
| 15 | class CoturnHMAC: |
||
| 16 | def __init__(self, coturn_hosts=None, coturn_keys=None): |
||
| 17 | self.coturn_hosts = coturn_hosts or COTURN_HOSTS |
||
| 18 | self.coturn_keys = coturn_keys or COTURN_KEYS |
||
| 19 | |||
| 20 | def fetch_token(self, username='faf-user', ttl=None): |
||
| 21 | servers = [] |
||
| 22 | |||
| 23 | ttl = ttl or TWILIO_TTL |
||
| 24 | if ttl is not None: |
||
| 25 | ttl = int(ttl) |
||
| 26 | else: |
||
| 27 | ttl = 3600*24 |
||
| 28 | |||
| 29 | # See https://github.com/coturn/coturn/wiki/turnserver#turn-rest-api |
||
| 30 | # create hmac of coturn_key + timestamp:username |
||
| 31 | timestamp = int(time.time()) + ttl |
||
| 32 | token_name = "{}:{}".format(timestamp, username) |
||
| 33 | |||
| 34 | for coturn_host, coturn_key in zip(self.coturn_hosts, self.coturn_keys): |
||
| 35 | secret = hmac.new(coturn_key.encode(), str(token_name).encode(), sha1) |
||
| 36 | auth_token = base64.b64encode(secret.digest()).decode() |
||
| 37 | |||
| 38 | servers.append(dict(urls=["turn:{}?transport=tcp".format(coturn_host), |
||
| 39 | "turn:{}?transport=udp".format(coturn_host), |
||
| 40 | "stun:{}".format(coturn_host)], |
||
| 41 | username=token_name, |
||
| 42 | credential=auth_token, |
||
| 43 | credentialType="token")) |
||
| 44 | |||
| 45 | return servers |
||
| 46 |