Passed
Push — develop ( 3d8444...ce4f6d )
by Dean
03:03
created

SystemService._generate_token()   A

Complexity

Conditions 1

Size

Total Lines 10

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 10
ccs 0
cts 5
cp 0
rs 9.4285
c 0
b 0
f 0
cc 1
crap 2
1
from plugin.api.core.base import Service, expose
2
from plugin.core.constants import PLUGIN_VERSION
3
from plugin.core.environment import Environment
4
5
from itsdangerous import TimedJSONWebSignatureSerializer as Serializer, SignatureExpired, BadSignature
0 ignored issues
show
Unused Code introduced by
Unused BadSignature imported from itsdangerous
Loading history...
Unused Code introduced by
Unused SignatureExpired imported from itsdangerous
Loading history...
6
from xml.etree import ElementTree
7
import logging
8
import os
9
import requests
10
11
log = logging.getLogger(__name__)
12
13
14
class SystemService(Service):
15
    __key__ = 'system'
16
17
    def __init__(self, manager):
18
        super(SystemService, self).__init__(manager)
19
20
        self._serializers = {
21
            'access': self._build_serializer(7 * 86400, 'authenticate'),  # access tokens expire in 7 days
22
        }
23
24
    @expose(authenticated=False)
25
    def authenticate(self, plex_token):
26
        serializer = self._serializers['access']
27
28
        if not serializer:
29
            raise Exception('Serializer not available')
30
31
        # Retrieve account details
32
        account = self._get_account(plex_token)
33
34
        # Ensure account is an administrator
35
        server = self._get_server(plex_token)
36
37
        if server.get('owned') != '1':
38
            raise Exception('Only server administrators have access to the API')
39
40
        # Construct token
41
        header, token = self._generate_token(serializer, {
42
            'username': account.get('username')
43
        })
44
45
        # Construct response
46
        return {
47
            'X-Channel-Token':           token,
48
            'X-Channel-Token-Expire':    header['exp']
49
        }
50
51
    @expose(authenticated=False)
52
    def test(self, *args, **kwargs):
0 ignored issues
show
Coding Style introduced by
This method could be written as a function/class method.

If a method does not access any attributes of the class, it could also be implemented as a function or static method. This can help improve readability. For example

class Foo:
    def some_method(self, x, y):
        return x + y;

could be written as

class Foo:
    @classmethod
    def some_method(cls, x, y):
        return x + y;
Loading history...
53
        return {
54
            'args': args,
55
            'kwargs': kwargs
56
        }
57
58
    @expose(authenticated=False)
59
    def ping(self):
60
        result = {
61
            'version': PLUGIN_VERSION
62
        }
63
64
        if self.context.token:
65
            result['token'] = {
66
                'username': self.context.token['username']
67
            }
68
69
        return result
70
71
    def validate(self, token):
72
        serializer = self._serializers['access']
73
74
        if not serializer:
75
            raise Exception('Serializer not available')
76
77
        return serializer.loads(token)
78
79
    @staticmethod
80
    def _build_serializer(expires_in, salt):
81
        if not Environment.dict['api.secret']:
82
            # Generate secret key
83
            Environment.dict['api.secret'] = os.urandom(50).encode('hex')
84
            Environment.dict.Save()
85
86
        return Serializer(
87
            Environment.dict['api.secret'],
88
            expires_in=expires_in,
89
            salt=salt
90
        )
91
92
    @staticmethod
93
    def _generate_token(serializer, data):
94
        # Construct token
95
        header = serializer.make_header(None)
96
        signer = serializer.make_signer(serializer.salt, serializer.algorithm)
97
98
        # Generate token from header + data
99
        token = signer.sign(serializer.dump_payload(header, data))
100
101
        return header, token
102
103
    @staticmethod
104
    def _get_account(plex_token):
105
        response = requests.get('https://plex.tv/users/account', headers={
106
            'X-Plex-Token': plex_token
107
        })
108
109
        # Parse response
110
        if response.status_code != 200:
111
            raise Exception('Unable to retrieve account details')
112
113
        return ElementTree.fromstring(response.content)
114
115
    @staticmethod
116
    def _get_server(plex_token):
117
        response = requests.get('https://plex.tv/api/resources?includeHttps=1', headers={
118
            'X-Plex-Token': plex_token
119
        })
120
121
        # Parse response
122
        if response.status_code != 200:
123
            raise Exception('Validation request failed')
124
125
        servers = ElementTree.fromstring(response.content)
126
127
        # Find local server
128
        for server in servers.findall('Device'):
129
            if server.get('clientIdentifier') != Environment.platform.machine_identifier:
130
                continue
131
132
            return server
133
134
        raise Exception('Unable to retrieve server details')
135