Passed
Push — master ( aeb165...d9ae97 )
by Dean
03:04
created

PlexAccount.refresh_details()   B

Complexity

Conditions 5

Size

Total Lines 37

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 26.2588
Metric Value
dl 0
loc 37
ccs 1
cts 19
cp 0.0526
rs 8.0894
cc 5
crap 26.2588
1 1
from plugin.core.exceptions import AccountAuthenticationError
2 1
from plugin.models.core import db
3 1
from plugin.models.account import Account
4
5 1
from datetime import datetime, timedelta
6 1
from playhouse.apsw_ext import *
7 1
from plex import Plex
8 1
from urllib import urlencode
9 1
from urlparse import urlparse, parse_qsl
10 1
from xml.etree import ElementTree
11 1
import logging
12 1
import requests
13
14 1
REFRESH_INTERVAL = timedelta(days=1)
15
16 1
log = logging.getLogger(__name__)
17
18
19 1
class PlexAccount(Model):
20 1
    class Meta:
21 1
        database = db
22 1
        db_table = 'plex.account'
23
24 1
    account = ForeignKeyField(Account, 'plex_accounts', unique=True)
25
26 1
    key = IntegerField(null=True, unique=True)
27 1
    username = CharField(null=True, unique=True)
28
29 1
    title = CharField(null=True)
30 1
    thumb = TextField(null=True)
31
32 1
    refreshed_at = DateTimeField(null=True)
33
34 1
    def __init__(self, *args, **kwargs):
35
        super(PlexAccount, self).__init__(*args, **kwargs)
36
37
        self._basic_credential = None
38
39 1
    @property
40
    def account_id(self):
41
        return self._data.get('account')
42
43 1
    @property
44
    def basic(self):
45
        if self._basic_credential:
46
            return self._basic_credential
47
48
        return self.basic_credentials.first()
49
50 1
    @basic.setter
51
    def basic(self, value):
52
        self._basic_credential = value
53
54 1
    def authorization(self):
55
        # Basic
56
        basic = self.basic
57
58
        if basic:
59
            return self.basic_authorization(basic)
60
61
        # No account authorization available
62
        raise AccountAuthenticationError("Plex account hasn't been authenticated")
63
64 1
    def basic_authorization(self, basic_credential=None):
65
        if basic_credential is None:
66
            basic_credential = self.basic
67
68
        # Ensure token exists
69
        if basic_credential.token_server is None:
70
            raise AccountAuthenticationError("Plex account is missing the server token")
71
72
        log.debug('Using basic authorization for %r', self)
73
74
        # Return authorization context
75
        return Plex.configuration.authentication(basic_credential.token_server)
76
77 1
    def refresh(self, force=False, save=True):
78
        # Retrieve credentials
79
        basic = self.basic
80
81
        if not basic:
82
            return False
83
84
        # Check if refresh is required
85
        if self.refresh_required(basic):
86
            force = True
87
88
        # Only refresh account every `REFRESH_INTERVAL`
89
        if not force and self.refreshed_at:
90
            since_refresh = datetime.utcnow() - self.refreshed_at
91
92
            if since_refresh < REFRESH_INTERVAL:
93
                return False
94
95
        # Refresh account details
96
        if not self.refresh_details(basic):
97
            return False
98
99
        if not basic.refresh():
100
            return False
101
102
        # Store changes in database
103
        self.refreshed_at = datetime.utcnow()
104
105
        if save:
106
            self.save()
107
108
        return True
109
110 1
    def refresh_details(self, basic):
111
        if basic.token_plex == 'anonymous':
112
            return self.refresh_anonymous()
113
114
        # Fetch account details
115
        response = requests.get('https://plex.tv/users/account', headers={
116
            'X-Plex-Token': basic.token_plex
117
        })
118
119
        if not (200 <= response.status_code < 300):
120
            # Invalid response
121
            return False
122
123
        user = ElementTree.fromstring(response.content)
124
125
        # Update details
126
        self.username = user.attrib.get('username') or None
127
128
        self.title = user.attrib.get('title')
129
        self.thumb = user.attrib.get('thumb')
130
131
        # Update `key`
132
        if self.id == 1:
133
            # Use administrator `key`
134
            self.key = 1
135
        else:
136
            # Retrieve user id from plex.tv details
137
            try:
138
                user_id = int(user.attrib.get('id'))
139
            except Exception, ex:
140
                log.warn('Unable to cast user id to integer: %s', ex, exc_info=True)
141
                user_id = None
142
143
            # Update `key`
144
            self.key = user_id
145
146
        return True
147
148 1
    def refresh_anonymous(self):
149
        log.debug('Refreshing anonymous plex account')
150
151
        self.username = 'administrator'
152
153
        self.title = 'Administrator'
154
        self.thumb = None
155
156
        if self.id == 1:
157
            self.key = 1
158
        else:
159
            self.key = None
160
161
        return True
162
163 1
    def refresh_required(self, basic):
164
        if self.key is None:
165
            return True
166
167
        if self.title is None:
168
            return True
169
170
        if basic.token_server is None:
171
            return True
172
173
        return False
174
175 1
    def thumb_url(self, default=None, rating='pg', size=256):
176
        if not self.thumb:
177
            return None
178
179
        thumb = urlparse(self.thumb)
180
181
        if thumb.netloc.endswith('plex.tv'):
182
            return self.thumb
183
184
        if not thumb.netloc.endswith('gravatar.com'):
185
            return None
186
187
        result = 'https://secure.gravatar.com%s' % thumb.path
188
189
        if default is None:
190
            query = dict(parse_qsl(thumb.query))
191
192
            default = query.get('d') or query.get('default')
193
194
        return result + '?' + urlencode({
195
            'd': default,
196
            'r': rating,
197
            's': size
198
        })
199
200 1
    def to_json(self, full=False):
201
        result = {
202
            'id': self.id,
203
            'username': self.username,
204
205
            'title': self.title,
206
            'thumb_url': self.thumb_url()
207
        }
208
209
        if not full:
210
            return result
211
212
        # Merge authorization details
213
        result['authorization'] = {
214
            'basic': {'state': 'empty'}
215
        }
216
217
        # - Basic credentials
218
        basic = self.basic
219
220
        if basic is not None:
221
            result['authorization']['basic'] = basic.to_json(self)
222
223
        return result
224
225 1
    def __repr__(self):
226
        return '<PlexAccount username: %r>' % (
227
            self.username,
228
        )
229