Passed
Push — master ( aeb165...d9ae97 )
by Dean
03:04
created

ActionManager   A

Complexity

Total Complexity 33

Size/Duplication

Total Lines 202
Duplicated Lines 0 %

Test Coverage

Coverage 0%
Metric Value
wmc 33
dl 0
loc 202
ccs 0
cts 104
cp 0
rs 9.3999

8 Methods

Rating   Name   Duplication   Size   Complexity  
A start() 0 9 2
A resolve() 0 18 1
B send() 0 22 4
A delete() 0 5 1
B is_duplicate() 0 22 4
D queue() 0 52 10
B process() 0 28 6
B run() 0 26 5
1
from plugin.core.helpers.thread import module
2
from plugin.managers.core.base import Manager
3
from plugin.models import ActionHistory, ActionQueue
4
from plugin.preferences import Preferences
5
6
from datetime import datetime, timedelta
7
from threading import Thread
8
from trakt import Trakt
9
import apsw
10
import json
11
import logging
12
import peewee
13
import time
14
15
log = logging.getLogger(__name__)
16
17
18
@module(start=True, blocking=True)
19
class ActionManager(Manager):
20
    _process_enabled = True
21
    _process_thread = None
22
23
    #
24
    # Queue
25
    #
26
27
    @classmethod
28
    def queue(cls, event, request, session=None, account=None):
29
        if event is None:
30
            return None
31
32
        obj = None
33
34
        if request is not None:
35
            request = json.dumps(request)
36
37
        # Retrieve `account_id` for action
38
        account_id = None
39
40
        if session:
41
            try:
42
                account_id = session.account_id
43
            except KeyError:
44
                account_id = None
45
46
        if account_id is None and account:
47
            account_id = account.id
48
49
        if account_id is None:
50
            log.debug('Unable to find valid account for event %r, session %r', event, session)
51
            return None
52
53
        if not Preferences.get('scrobble.enabled', account_id):
54
            log.debug('Scrobbler not enabled for account %r', account_id)
55
            return None
56
57
        # Try queue the event
58
        try:
59
            obj = ActionQueue.create(
60
                account=account_id,
61
                session=session,
62
63
                progress=session.progress,
64
                rating_key=session.rating_key,
65
66
                event=event,
67
                request=request,
68
69
                queued_at=datetime.utcnow()
70
            )
71
            log.debug('Queued %r event for %r', event, session)
72
        except (apsw.ConstraintError, peewee.IntegrityError), ex:
73
            log.warn('Unable to queue event %r for %r: %s', event, session, ex, exc_info=True)
74
75
        # Ensure process thread is started
76
        cls.start()
77
78
        return obj
79
80
    @classmethod
81
    def delete(cls, session_id, event):
82
        ActionQueue.delete().where(
83
            ActionQueue.session == session_id,
84
            ActionQueue.event == event
85
        ).execute()
86
87
    #
88
    # Process
89
    #
90
    @classmethod
91
    def start(cls):
92
        if cls._process_thread is not None:
93
            return
94
95
        cls._process_thread = Thread(target=cls.run)
96
        cls._process_thread.daemon = True
97
98
        cls._process_thread.start()
99
100
    @classmethod
101
    def run(cls):
102
        while cls._process_enabled:
103
            # Retrieve one action from the queue
104
            try:
105
                action = ActionQueue.get()
106
            except ActionQueue.DoesNotExist:
107
                time.sleep(5)
108
                continue
109
            except Exception, ex:
110
                log.warn('Unable to retrieve action from queue - %s', ex, exc_info=True)
111
                time.sleep(5)
112
                continue
113
114
            log.debug('Retrieved %r action from queue', action.event)
115
116
            try:
117
                performed = cls.process(action)
118
119
                cls.resolve(action, performed)
120
121
                log.debug('Action %r sent, moved action to history', action.event)
122
            except Exception, ex:
123
                log.warn('Unable to process action %r - %s', action.event, ex, exc_info=True)
124
            finally:
125
                time.sleep(5)
126
127
    @classmethod
128
    def process(cls, action):
129
        if not action.request:
130
            return None
131
132
        if cls.is_duplicate(action):
133
            return None
134
135
        interface, method = action.event.split('/')
136
        request = str(action.request)
137
138
        log.debug('Sending action %r (account: %r, interface: %r, method: %r)', action.event, action.account, interface, method)
139
140
        try:
141
            result = cls.send(action, Trakt[interface][method], request)
142
        except Exception, ex:
143
            log.error('Unable to send action %r: %r', action.event, ex, exc_info=True)
144
            return None
145
146
        if not result:
147
            # Invalid response
148
            return None
149
150
        if interface == 'scrobble':
151
            return result.get('action')
152
153
        log.warn('result: %r', result)
154
        return None
155
156
    @classmethod
157
    def is_duplicate(cls, action):
158
        if action.event != 'scrobble/stop':
159
            return False
160
161
        if action.progress < 80:
162
            return False
163
164
        results = ActionHistory.select().where(
165
            ActionHistory.account == action.account,
166
            ActionHistory.rating_key == action.rating_key,
167
168
            ActionHistory.performed == 'scrobble',
169
170
            ActionHistory.sent_at > action.queued_at - timedelta(hours=1)
171
        )
172
173
        if results.count() > 0:
174
            log.info('Ignoring duplicate %r action, scrobble already performed in the last hour', action.event)
175
            return True
176
177
        return False
178
179
    @classmethod
180
    def send(cls, action, func, request):
181
        # Retrieve `Account` for action
182
        account = action.account
183
184
        if not account:
185
            log.info('Missing `account` for action, unable to send')
186
            return None
187
188
        # Retrieve request data
189
        request = json.loads(request)
190
        log.debug('request: %r', request)
191
192
        # Send request with account authorization
193
        trakt_account = account.trakt
194
195
        if trakt_account is None:
196
            log.info('Missing trakt account for %r', account)
197
            return None
198
        
199
        with trakt_account.authorization():
200
            return func(**request)
201
202
    @classmethod
203
    def resolve(cls, action, performed):
204
        # Store action in history
205
        ActionHistory.create(
206
            account=action.account_id,
207
            session=action.session_id,
208
209
            rating_key=action.rating_key,
210
211
            event=action.event,
212
            performed=performed,
213
214
            queued_at=action.queued_at,
215
            sent_at=datetime.utcnow()
216
        )
217
218
        # Delete queued action
219
        cls.delete(action.session_id, action.event)
220