Test Failed
Push — develop ( 572338...fb7300 )
by Dean
02:29
created

SyncTask.create()   B

Complexity

Conditions 3

Size

Total Lines 34

Duplication

Lines 0
Ratio 0 %

Code Coverage

Tests 1
CRAP Score 8.2077
Metric Value
cc 3
dl 0
loc 34
ccs 1
cts 6
cp 0.1666
crap 8.2077
rs 8.8571
1 1
from plugin.managers import ExceptionManager
2 1
from plugin.models import *
3 1
from plugin.sync.core.enums import SyncData, SyncMode
4 1
from plugin.sync.core.exceptions import SyncAbort
5 1
from plugin.sync.core.task.artifacts import SyncArtifacts
6 1
from plugin.sync.core.task.configuration import SyncConfiguration
7 1
from plugin.sync.core.task.map import SyncMap
8 1
from plugin.sync.core.task.progress import SyncProgress
9 1
from plugin.sync.core.task.profiler import SyncProfiler
10 1
from plugin.sync.core.task.state import SyncState
11
12 1
from datetime import datetime
13 1
from peewee import JOIN_LEFT_OUTER
14 1
import logging
15 1
import time
16
17 1
log = logging.getLogger(__name__)
18
19
20 1
class SyncTask(object):
21 1
    def __init__(self, account, mode, data, media, result, status, **kwargs):
22 1
        self.account = account
23
24
        # Sync options
25 1
        self.mode = mode
26 1
        self.data = data
27 1
        self.media = media
28
29
        # Extra arguments
30 1
        self.kwargs = kwargs
31
32
        # Handlers/Modes for task
33 1
        self.handlers = None
34 1
        self.modes = None
35
36 1
        # State/Result management
37
        self.result = result
38 1
        self.status = status
39 1
40 1
        self.exceptions = []
41
42 1
        self.finished = False
43
        self.started = False
44
        self.success = None
45 1
46 1
        self._abort = False
47 1
48 1
        # Construct children
49 1
        self.artifacts = SyncArtifacts(self)
50
        self.configuration = SyncConfiguration(self)
51 1
        self.map = SyncMap(self)
52
        self.progress = SyncProgress(self)
53 1
        self.profiler = SyncProfiler(self)
54
55
        self.state = SyncState(self)
56
57
    @property
58
    def id(self):
59
        if self.result is None:
60 1
            return None
61
62
        return self.result.id
63
64
    @property
65
    def elapsed(self):
66
        if self.result is None:
67 1
            return None
68
69
        return (datetime.utcnow() - self.result.started_at).total_seconds()
70
71
    def construct(self, handlers, modes):
72
        log.debug('Constructing %d handlers...', len(handlers))
73
        self.handlers = dict(self._construct_modules(handlers, 'data'))
74
75
        log.debug('Constructing %d modes...', len(modes))
76
        self.modes = dict(self._construct_modules(modes, 'mode'))
77
78
    def load(self):
79
        # Load task configuration
80
        self.configuration.load(self.account)
81
82 1
        # Automatically determine enabled data types
83
        if self.data is None:
84
            self.data = self.get_enabled_data(self.configuration, self.mode)
85
86
        log.debug('Sync Data: %r', self.data)
87
        log.debug('Sync Media: %r', self.media)
88
89
        # Load children
90
        self.profiler.load()
91
        self.state.load()
92
93
    def abort(self, timeout=None):
94
        # Set `abort` flag, thread will abort on the next `checkpoint()`
95
        self._abort = True
96 1
97
        if timeout is None:
98
            return
99
100
        # Wait `timeout` seconds for task to finish
101
        for x in xrange(timeout):
102
            if self.finished:
103 1
                return
104
105
            time.sleep(1)
106
107
    def checkpoint(self):
108
        # Check if an abort has been requested
109
        if not self._abort:
110
            return
111
112
        raise SyncAbort()
113
114
    def finish(self):
115
        # Update result in database
116
        self.result.ended_at = datetime.utcnow()
117
        self.result.success = self.success
118
        self.result.save()
119
120
        # Store exceptions in database
121
        for exc_info in self.exceptions:
122
            try:
123
                self.store_exception(self.result, exc_info)
124
            except Exception, ex:
125 1
                log.warn('Unable to store exception: %s', str(ex), exc_info=True)
126
127
        # Flush caches to archives
128
        self.state.flush()
129
130
        # Display profiler report
131
        self.profiler.log_report()
132
133
        # Mark finished
134
        self.finished = True
135
136
    @staticmethod
137
    def store_exception(result, exc_info):
138
        exception, error = ExceptionManager.create.from_exc_info(exc_info)
139
140
        # Link error to result
141 1
        SyncResultError.create(
142
            result=result,
143
            error=error
144
        )
145
146
        # Link exception to result
147
        SyncResultException.create(
148
            result=result,
149
            exception=exception
150
        )
151
152
    @classmethod
153
    def create(cls, account, mode, data, media, trigger, **kwargs):
154
        # Get account
155
        if type(account) is int:
156
            account = cls.get_account(account)
157
        elif type(account) is not Account:
158
            raise ValueError('Unexpected value provided for the "account" parameter')
159
160
        # Get/Create sync status
161
        status, created = SyncStatus.get_or_create(
162
            account=account,
163
            mode=mode
164
        )
165
166
        # Create sync result
167
        result = SyncResult.create(
168
            status=status,
169
            trigger=trigger,
170
171
            started_at=datetime.utcnow()
172
        )
173
174
        # Create sync task
175
        task = SyncTask(
176 1
            account, mode,
177
            data, media,
178
            result, status,
179
            **kwargs
180
        )
181
182
        # Load sync configuration/state
183
        task.load()
184
185
        return task
186
187
    @classmethod
188
    def get_account(cls, account_id):
189
        # TODO Move account retrieval/join to `Account` class
190
        return (
191
            Account.select(
192
                Account.id,
193
                Account.name,
194
195
                PlexAccount.id,
196
                PlexAccount.key,
197
                PlexAccount.username,
198
                PlexBasicCredential.token_plex,
199
                PlexBasicCredential.token_server,
200
201
                TraktAccount.username,
202
                TraktBasicCredential.token,
203
204
                TraktOAuthCredential.access_token,
205
                TraktOAuthCredential.refresh_token,
206
                TraktOAuthCredential.created_at,
207
                TraktOAuthCredential.expires_in
208
            )
209
            # Plex
210
            .join(
211
                PlexAccount, JOIN_LEFT_OUTER, on=(
212
                    PlexAccount.account == Account.id
213
                ).alias('plex')
214
            )
215
            .join(
216
                PlexBasicCredential, JOIN_LEFT_OUTER, on=(
217
                    PlexBasicCredential.account == PlexAccount.id
218
                ).alias('basic')
219
            )
220
            # Trakt
221
            .switch(Account)
222
            .join(
223
                TraktAccount, JOIN_LEFT_OUTER, on=(
224
                    TraktAccount.account == Account.id
225
                ).alias('trakt')
226
            )
227
            .join(
228
                TraktBasicCredential, JOIN_LEFT_OUTER, on=(
229
                    TraktBasicCredential.account == TraktAccount.id
230
                ).alias('basic')
231 1
            )
232
            .switch(TraktAccount)
233
            .join(
234
                TraktOAuthCredential, JOIN_LEFT_OUTER, on=(
235
                    TraktOAuthCredential.account == TraktAccount.id
236
                ).alias('oauth')
237
            )
238
            .where(Account.id == account_id)
239
            .get()
240
        )
241
242
    @classmethod
243
    def get_enabled_data(cls, configuration, mode):
244
        enabled = []
245
246
        # Determine accepted modes
247
        modes = [SyncMode.Full, mode]
248
249
        if mode == SyncMode.FastPull:
250
            modes.append(SyncMode.Pull)
251
252
        # Retrieve enabled data
253
        if configuration['sync.watched.mode'] in modes:
254
            enabled.append(SyncData.Watched)
255
256
        if configuration['sync.ratings.mode'] in modes:
257
            enabled.append(SyncData.Ratings)
258
259
        if configuration['sync.playback.mode'] in modes:
260
            enabled.append(SyncData.Playback)
261
262
        if configuration['sync.collection.mode'] in modes:
263
            enabled.append(SyncData.Collection)
264
265
        # Lists
266
        if configuration['sync.lists.watchlist.mode'] in modes:
267
            enabled.append(SyncData.Watchlist)
268
269
        if configuration['sync.lists.liked.mode'] in modes:
270
            enabled.append(SyncData.Liked)
271
272
        if configuration['sync.lists.personal.mode'] in modes:
273
            enabled.append(SyncData.Personal)
274
275
        # Convert to enum value
276
        result = None
277
278
        for data in enabled:
279
            if result is None:
280
                result = data
281
                continue
282
283
            result |= data
284
285
        return result
286
287
    def _construct_modules(self, modules, attribute):
288
        for cls in modules:
289
            keys = getattr(cls, attribute, None)
290
291
            if keys is None:
292
                log.warn('Module %r is missing a valid %r attribute', cls, attribute)
293
                continue
294
295
            # Convert `keys` to list
296
            if type(keys) is not list:
297
                keys = [keys]
298
299
            # Construct module
300
            obj = cls(self)
301
302
            # Return module with keys
303
            for key in keys:
304
                yield key, obj
305