Test Failed
Push — develop ( a80574...5ed66c )
by Dean
02:36
created

ActionManager.delete()   A

Complexity

Conditions 1

Size

Total Lines 5

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 0
CRAP Score 2

Importance

Changes 0
Metric Value
dl 0
loc 5
ccs 0
cts 1
cp 0
rs 9.4285
c 0
b 0
f 0
cc 1
crap 2
1
from plugin.core.helpers.thread import module
2
from plugin.managers.core.base import Manager
3
from plugin.models import ActionHistory, ActionQueue
4
from plugin.preferences import Preferences
5
6
from datetime import datetime, timedelta
7
from threading import Thread
8
from trakt import Trakt
9
import apsw
10
import json
11
import logging
12
import peewee
13
import time
14
15
log = logging.getLogger(__name__)
16
17
18
@module(start=True, blocking=True)
19
class ActionManager(Manager):
20
    _process_enabled = True
21
    _process_thread = None
22
23
    #
24
    # Queue
25
    #
26
27
    @classmethod
28
    def queue(cls, event, request, session=None, account=None):
29
        if event is None:
30
            return None
31
32
        obj = None
33
34
        if request is not None:
35
            request = json.dumps(request)
36
37
        # Retrieve `account_id` for action
38
        account_id = None
39
40
        if session:
41
            try:
42
                account_id = session.account_id
43
            except KeyError:
44
                account_id = None
45
46
        if account_id is None and account:
47
            account_id = account.id
48
49
        if account_id is None:
50
            log.debug('Unable to find valid account for event %r, session %r', event, session)
51
            return None
52
53
        if not Preferences.get('scrobble.enabled', account_id):
54
            log.debug('Scrobbler not enabled for account %r', account_id)
55
            return None
56
57
        # Try queue the event
58
        try:
59
            obj = ActionQueue.create(
60
                account=account_id,
61
                session=session,
62
63
                progress=session.progress,
64
65
                part=session.part,
66
                rating_key=session.rating_key,
67
68
                event=event,
69
                request=request,
70
71
                queued_at=datetime.utcnow()
72
            )
73
            log.debug('Queued %r event for %r', event, session)
74
        except (apsw.ConstraintError, peewee.IntegrityError) as ex:
75
            log.info('Event %r has already been queued for session %r: %s', event, session.session_key, ex, exc_info=True)
76
        except Exception as ex:
77
            log.warn('Unable to queue event %r for %r: %s', event, session, ex, exc_info=True)
78
79
        # Ensure process thread is started
80
        cls.start()
81
82
        return obj
83
84
    @classmethod
85
    def delete(cls, session_id, event):
86
        ActionQueue.delete().where(
87
            ActionQueue.session == session_id,
88
            ActionQueue.event == event
89
        ).execute()
90
91
    #
92
    # Process
93
    #
94
    @classmethod
95
    def start(cls):
96
        if cls._process_thread is not None:
97
            return
98
99
        cls._process_thread = Thread(target=cls.run)
100
        cls._process_thread.daemon = True
101
102
        cls._process_thread.start()
103
104
    @classmethod
105
    def run(cls):
106
        while cls._process_enabled:
107
            # Retrieve one action from the queue
108
            try:
109
                action = ActionQueue.get()
110
            except ActionQueue.DoesNotExist:
111
                time.sleep(5)
112
                continue
113
            except Exception, ex:
114
                log.warn('Unable to retrieve action from queue - %s', ex, exc_info=True)
115
                time.sleep(5)
116
                continue
117
118
            log.debug('Retrieved %r action from queue', action.event)
119
120
            try:
121
                performed = cls.process(action)
122
123
                cls.resolve(action, performed)
124
125
                log.debug('Action %r sent, moved action to history', action.event)
126
            except Exception, ex:
127
                log.warn('Unable to process action %%r - %s' % ex.message, action.event, exc_info=True, extra={
128
                    'event': {
129
                        'module': __name__,
130
                        'name': 'run.process_exception',
131
                        'key': ex.message
132
                    }
133
                })
134
            finally:
135
                time.sleep(5)
136
137
    @classmethod
138
    def process(cls, action):
139
        if not action.request:
140
            return None
141
142
        if cls.is_duplicate(action):
143
            return None
144
145
        interface, method = action.event.split('/')
146
        request = str(action.request)
147
148
        log.debug('Sending action %r (account: %r, interface: %r, method: %r)', action.event, action.account, interface, method)
149
150
        try:
151
            result = cls.send(action, Trakt[interface][method], request)
152
        except Exception, ex:
153
            log.error('Unable to send action %r: %r', action.event, ex, exc_info=True)
154
            return None
155
156
        if not result:
157
            # Invalid response
158
            return None
159
160
        if interface == 'scrobble':
161
            return result.get('action')
162
163
        log.warn('result: %r', result)
164
        return None
165
166
    @classmethod
167
    def is_duplicate(cls, action):
168
        if action.event != 'scrobble/stop':
169
            return False
170
171
        # Retrieve scrobble duplication period
172
        duplication_period = Preferences.get('scrobble.duplication_period')
173
174
        if duplication_period is None:
175
            return False
176
177
        # Check for duplicate scrobbles in `duplication_period`
178
        scrobbled = ActionHistory.has_scrobbled(
179
            action.account, action.rating_key,
180
            part=action.part,
181
            after=action.queued_at - timedelta(minutes=duplication_period)
182
        )
183
184
        if scrobbled:
185
            log.info(
186
                'Ignoring duplicate %r action, scrobble already performed in the last %d minutes',
187
                action.event, duplication_period
188
            )
189
            return True
190
191
        return False
192
193
    @classmethod
194
    def send(cls, action, func, request):
195
        # Retrieve `Account` for action
196
        account = action.account
197
198
        if not account:
199
            log.info('Missing `account` for action, unable to send')
200
            return None
201
202
        # Retrieve request data
203
        request = json.loads(request)
204
        log.debug('request: %r', request)
205
206
        # Send request with account authorization
207
        trakt_account = account.trakt
208
209
        if trakt_account is None:
210
            log.info('Missing trakt account for %r', account)
211
            return None
212
        
213
        with trakt_account.authorization():
214
            return func(**request)
215
216
    @classmethod
217
    def resolve(cls, action, performed):
218
        # Store action in history
219
        ActionHistory.create(
220
            account=action.account_id,
221
            session=action.session_id,
222
223
            part=action.part,
224
            rating_key=action.rating_key,
225
226
            event=action.event,
227
            performed=performed,
228
229
            queued_at=action.queued_at,
230
            sent_at=datetime.utcnow()
231
        )
232
233
        # Delete queued action
234
        cls.delete(action.session_id, action.event)
235